This is page 2 of 2. Use http://codebase.md/alexei-led/aws-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .github
│ └── workflows
│ ├── ci.yml
│ └── release.yml
├── .gitignore
├── CLAUDE.md
├── codecov.yml
├── deploy
│ └── docker
│ ├── docker-compose.yml
│ └── Dockerfile
├── docs
│ └── VERSION.md
├── LICENSE
├── Makefile
├── media
│ └── demo.mp4
├── pyproject.toml
├── README.md
├── security_config_example.yaml
├── smithery.yaml
├── spec.md
├── src
│ └── aws_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli_executor.py
│ ├── config.py
│ ├── prompts.py
│ ├── resources.py
│ ├── security.py
│ ├── server.py
│ └── tools.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── test_aws_live.py
│ │ ├── test_security_integration.py
│ │ └── test_server_integration.py
│ ├── test_aws_integration.py
│ ├── test_aws_setup.py
│ ├── test_bucket_creation.py
│ ├── test_run_integration.py
│ └── unit
│ ├── __init__.py
│ ├── test_cli_executor.py
│ ├── test_init.py
│ ├── test_main.py
│ ├── test_prompts.py
│ ├── test_resources.py
│ ├── test_security.py
│ ├── test_server.py
│ └── test_tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/unit/test_security.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for the security module."""
2 |
3 | from unittest.mock import mock_open, patch
4 |
5 | import pytest
6 | import yaml
7 |
8 | from aws_mcp_server.security import (
9 | DEFAULT_DANGEROUS_COMMANDS,
10 | DEFAULT_SAFE_PATTERNS,
11 | SecurityConfig,
12 | ValidationRule,
13 | check_regex_rules,
14 | is_service_command_safe,
15 | load_security_config,
16 | reload_security_config,
17 | validate_aws_command,
18 | validate_command,
19 | validate_pipe_command,
20 | )
21 |
22 |
23 | def test_is_service_command_safe():
24 | """Test the is_service_command_safe function."""
25 | # Test with known safe pattern
26 | assert is_service_command_safe("aws s3 ls", "s3") is True
27 |
28 | # Test with known dangerous pattern that has safe override
29 | assert is_service_command_safe("aws s3 ls --profile test", "s3") is True
30 |
31 | # Test with known dangerous pattern with no safe override
32 | assert is_service_command_safe("aws s3 rb s3://my-bucket", "s3") is False
33 |
34 | # Test with unknown service
35 | assert is_service_command_safe("aws unknown-service command", "unknown-service") is False
36 |
37 |
38 | def test_check_regex_rules():
39 | """Test the check_regex_rules function."""
40 | # Test with a pattern that should match
41 | with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
42 | mock_config.regex_rules = {
43 | "general": [
44 | ValidationRule(
45 | pattern=r"aws .* --profile\s+(root|admin|administrator)",
46 | description="Prevent use of sensitive profiles",
47 | error_message="Using sensitive profiles (root, admin) is restricted",
48 | regex=True,
49 | )
50 | ]
51 | }
52 |
53 | # Should match the rule
54 | error = check_regex_rules("aws s3 ls --profile root")
55 | assert error is not None
56 | assert "Using sensitive profiles" in error
57 |
58 | # Should not match
59 | assert check_regex_rules("aws s3 ls --profile user") is None
60 |
61 |
62 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
63 | def test_validate_aws_command_basic():
64 | """Test basic validation of AWS commands."""
65 | # Valid command should not raise
66 | validate_aws_command("aws s3 ls")
67 |
68 | # Invalid commands should raise ValueError
69 | with pytest.raises(ValueError, match="Commands must start with 'aws'"):
70 | validate_aws_command("s3 ls")
71 |
72 | with pytest.raises(ValueError, match="must include an AWS service"):
73 | validate_aws_command("aws")
74 |
75 |
76 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
77 | def test_validate_aws_command_dangerous():
78 | """Test validation of dangerous AWS commands."""
79 | # Use a test config
80 | with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
81 | mock_config.dangerous_commands = {
82 | "iam": ["aws iam create-user", "aws iam create-access-key"],
83 | "ec2": ["aws ec2 terminate-instances"],
84 | }
85 | mock_config.safe_patterns = {
86 | "iam": ["aws iam create-user --help"],
87 | "ec2": [],
88 | }
89 | mock_config.regex_rules = {}
90 |
91 | # Dangerous command should raise ValueError
92 | with pytest.raises(ValueError, match="restricted for security reasons"):
93 | validate_aws_command("aws iam create-user --user-name test-user")
94 |
95 | # Help on dangerous command should be allowed
96 | validate_aws_command("aws iam create-user --help")
97 |
98 | # Dangerous command with no safe override should raise
99 | with pytest.raises(ValueError, match="restricted for security reasons"):
100 | validate_aws_command("aws ec2 terminate-instances --instance-id i-12345")
101 |
102 |
103 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
104 | def test_validate_aws_command_regex():
105 | """Test validation of AWS commands with regex rules."""
106 | # Set up command for testing
107 | profile_command = "aws s3 ls --profile root"
108 | policy_command = """aws s3api put-bucket-policy --bucket my-bucket --policy "{\\"Version\\":\\"2012-10-17\\",\
109 | \\"Statement\\":[{\\"Effect\\":\\"Allow\\",\\"Principal\\":\\"*\\",\\"Action\\":\\"s3:GetObject\\",\
110 | \\"Resource\\":\\"arn:aws:s3:::my-bucket/*\\"}]}" """
111 |
112 | # We need to patch both the check_regex_rules function and the config
113 | with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
114 | mock_config.dangerous_commands = {}
115 | mock_config.safe_patterns = {}
116 |
117 | # Test for the root profile check
118 | with patch("aws_mcp_server.security.check_regex_rules") as mock_check:
119 | mock_check.return_value = "Using sensitive profiles is restricted"
120 |
121 | with pytest.raises(ValueError, match="Using sensitive profiles is restricted"):
122 | validate_aws_command(profile_command)
123 |
124 | # Verify check_regex_rules was called
125 | mock_check.assert_called_once()
126 |
127 | # Test for the bucket policy check
128 | with patch("aws_mcp_server.security.check_regex_rules") as mock_check:
129 | # Have the mock return error for the policy command
130 | mock_check.return_value = "Creating public bucket policies is restricted"
131 |
132 | with pytest.raises(ValueError, match="Creating public bucket policies is restricted"):
133 | validate_aws_command(policy_command)
134 |
135 | # Verify check_regex_rules was called
136 | mock_check.assert_called_once()
137 |
138 |
139 | @patch("aws_mcp_server.security.SECURITY_MODE", "permissive")
140 | def test_validate_aws_command_permissive():
141 | """Test validation of AWS commands in permissive mode."""
142 | # In permissive mode, dangerous commands should be allowed
143 | with patch("aws_mcp_server.security.logger.warning") as mock_warning:
144 | validate_aws_command("aws iam create-user --user-name test-user")
145 | mock_warning.assert_called_once()
146 |
147 |
148 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
149 | def test_validate_pipe_command():
150 | """Test validation of piped commands."""
151 | # Mock the validate_aws_command and validate_unix_command functions
152 | with patch("aws_mcp_server.security.validate_aws_command") as mock_aws_validate:
153 | with patch("aws_mcp_server.security.validate_unix_command") as mock_unix_validate:
154 | # Set up return values
155 | mock_unix_validate.return_value = True
156 |
157 | # Test valid piped command
158 | validate_pipe_command("aws s3 ls | grep bucket")
159 | mock_aws_validate.assert_called_once_with("aws s3 ls")
160 |
161 | # Reset mocks
162 | mock_aws_validate.reset_mock()
163 | mock_unix_validate.reset_mock()
164 |
165 | # Test command with unrecognized Unix command
166 | mock_unix_validate.return_value = False
167 | with pytest.raises(ValueError, match="not allowed"):
168 | validate_pipe_command("aws s3 ls | unknown_command")
169 |
170 | # Empty command should raise
171 | with pytest.raises(ValueError, match="Empty command"):
172 | validate_pipe_command("")
173 |
174 | # Empty second command test
175 | # Configure split_pipe_command to return a list with an empty second command
176 | with patch("aws_mcp_server.security.split_pipe_command") as mock_split_pipe:
177 | mock_split_pipe.return_value = ["aws s3 ls", ""]
178 | with pytest.raises(ValueError, match="Empty command at position"):
179 | validate_pipe_command("aws s3 ls | ")
180 |
181 |
182 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
183 | def test_validate_command():
184 | """Test the centralized validate_command function."""
185 | # Simple AWS command
186 | validate_command("aws s3 ls")
187 |
188 | # Piped command
189 | validate_command("aws s3 ls | grep bucket")
190 |
191 | # Invalid command
192 | with pytest.raises(ValueError):
193 | validate_command("s3 ls")
194 |
195 |
196 | def test_load_security_config_default():
197 | """Test loading security configuration with defaults."""
198 | with patch("aws_mcp_server.security.SECURITY_CONFIG_PATH", ""):
199 | config = load_security_config()
200 |
201 | # Should have loaded default values
202 | assert config.dangerous_commands == DEFAULT_DANGEROUS_COMMANDS
203 | assert config.safe_patterns == DEFAULT_SAFE_PATTERNS
204 |
205 | # Should have regex rules converted from DEFAULT_REGEX_RULES
206 | assert "general" in config.regex_rules
207 | assert len(config.regex_rules["general"]) > 0
208 | assert isinstance(config.regex_rules["general"][0], ValidationRule)
209 |
210 |
211 | def test_load_security_config_custom():
212 | """Test loading security configuration from a custom file."""
213 | # Mock YAML file contents
214 | test_config = {
215 | "dangerous_commands": {"test_service": ["aws test_service dangerous_command"]},
216 | "safe_patterns": {"test_service": ["aws test_service safe_pattern"]},
217 | "regex_rules": {"test_service": [{"pattern": "test_pattern", "description": "Test description", "error_message": "Test error message"}]},
218 | }
219 |
220 | # Mock the open function to return our test config
221 | with patch("builtins.open", mock_open(read_data=yaml.dump(test_config))):
222 | with patch("aws_mcp_server.security.SECURITY_CONFIG_PATH", "/fake/path.yaml"):
223 | with patch("pathlib.Path.exists", return_value=True):
224 | config = load_security_config()
225 |
226 | # Should have our custom values
227 | assert "test_service" in config.dangerous_commands
228 | assert "test_service" in config.safe_patterns
229 | assert "test_service" in config.regex_rules
230 | assert config.regex_rules["test_service"][0].pattern == "test_pattern"
231 |
232 |
233 | def test_load_security_config_error():
234 | """Test error handling when loading security configuration."""
235 | with patch("builtins.open", side_effect=Exception("Test error")):
236 | with patch("aws_mcp_server.security.SECURITY_CONFIG_PATH", "/fake/path.yaml"):
237 | with patch("pathlib.Path.exists", return_value=True):
238 | with patch("aws_mcp_server.security.logger.error") as mock_error:
239 | with patch("aws_mcp_server.security.logger.warning") as mock_warning:
240 | config = load_security_config()
241 |
242 | # Should log error and warning
243 | mock_error.assert_called_once()
244 | mock_warning.assert_called_once()
245 |
246 | # Should still have default values
247 | assert config.dangerous_commands == DEFAULT_DANGEROUS_COMMANDS
248 |
249 |
250 | def test_reload_security_config():
251 | """Test reloading security configuration."""
252 | with patch("aws_mcp_server.security.load_security_config") as mock_load:
253 | mock_load.return_value = SecurityConfig(dangerous_commands={"test": ["test"]}, safe_patterns={"test": ["test"]})
254 |
255 | reload_security_config()
256 |
257 | # Should have called load_security_config
258 | mock_load.assert_called_once()
259 |
260 |
261 | # Integration-like tests for specific dangerous commands
262 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
263 | def test_specific_dangerous_commands():
264 | """Test validation of specific dangerous commands."""
265 | # Configure the SECURITY_CONFIG with some dangerous commands
266 | with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
267 | mock_config.dangerous_commands = {
268 | "iam": ["aws iam create-user", "aws iam create-access-key", "aws iam attach-user-policy"],
269 | "ec2": ["aws ec2 terminate-instances"],
270 | "s3": ["aws s3 rb"],
271 | "rds": ["aws rds delete-db-instance"],
272 | }
273 | mock_config.safe_patterns = {
274 | "iam": ["aws iam get-", "aws iam list-"],
275 | "ec2": ["aws ec2 describe-"],
276 | "s3": ["aws s3 ls"],
277 | "rds": ["aws rds describe-"],
278 | }
279 | mock_config.regex_rules = {}
280 |
281 | # IAM dangerous commands
282 | with pytest.raises(ValueError, match="restricted for security reasons"):
283 | validate_aws_command("aws iam create-user --user-name test-user")
284 |
285 | with pytest.raises(ValueError, match="restricted for security reasons"):
286 | validate_aws_command("aws iam create-access-key --user-name test-user")
287 |
288 | with pytest.raises(ValueError, match="restricted for security reasons"):
289 | validate_aws_command("aws iam attach-user-policy --user-name test-user --policy-arn arn:aws:iam::aws:policy/AdministratorAccess")
290 |
291 | # EC2 dangerous commands
292 | with pytest.raises(ValueError, match="restricted for security reasons"):
293 | validate_aws_command("aws ec2 terminate-instances --instance-ids i-12345")
294 |
295 | # S3 dangerous commands
296 | with pytest.raises(ValueError, match="restricted for security reasons"):
297 | validate_aws_command("aws s3 rb s3://my-bucket --force")
298 |
299 | # RDS dangerous commands
300 | with pytest.raises(ValueError, match="restricted for security reasons"):
301 | validate_aws_command("aws rds delete-db-instance --db-instance-identifier my-db --skip-final-snapshot")
302 |
303 |
304 | # Tests for safe patterns overriding dangerous commands
305 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
306 | def test_safe_overrides():
307 | """Test safe patterns that override dangerous commands."""
308 | # IAM help commands should be allowed even if potentially dangerous
309 | validate_aws_command("aws iam --help")
310 | validate_aws_command("aws iam help")
311 | validate_aws_command("aws iam get-user --user-name test-user")
312 | validate_aws_command("aws iam list-users")
313 |
314 | # EC2 describe commands should be allowed
315 | validate_aws_command("aws ec2 describe-instances")
316 |
317 | # S3 list commands should be allowed
318 | validate_aws_command("aws s3 ls")
319 | validate_aws_command("aws s3api list-buckets")
320 |
321 |
322 | # Tests for complex regex patterns
323 | @patch("aws_mcp_server.security.SECURITY_MODE", "strict")
324 | def test_complex_regex_patterns():
325 | """Test more complex regex patterns."""
326 | # Instead of testing the regex directly, test the behavior we expect
327 | dangerous_sg_command = "aws ec2 authorize-security-group-ingress --group-id sg-12345 --protocol tcp --port 22 --cidr 0.0.0.0/0"
328 | safe_sg_command_80 = "aws ec2 authorize-security-group-ingress --group-id sg-12345 --protocol tcp --port 80 --cidr 0.0.0.0/0"
329 |
330 | # Define the validation rule directly
331 | ValidationRule(
332 | pattern=r"aws ec2 authorize-security-group-ingress.*--cidr\s+0\.0\.0\.0/0.*--port\s+(?!80|443)\d+",
333 | description="Prevent open security groups for non-web ports",
334 | error_message="Security group error",
335 | regex=True,
336 | )
337 |
338 | # Test with mocked check_regex_rules
339 | with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
340 | mock_config.dangerous_commands = {}
341 | mock_config.safe_patterns = {}
342 |
343 | with patch("aws_mcp_server.security.check_regex_rules") as mock_check:
344 | # Set up mock to return error for the dangerous command
345 | mock_check.side_effect = lambda cmd, svc=None: "Security group error" if "--port 22" in cmd else None
346 |
347 | # Test dangerous command raises error
348 | with pytest.raises(ValueError, match="Security group error"):
349 | validate_aws_command(dangerous_sg_command)
350 |
351 | # Test safe command doesn't raise
352 | mock_check.reset_mock()
353 | mock_check.return_value = None # Explicit safe return
354 | validate_aws_command(safe_sg_command_80) # Should not raise
355 |
```
--------------------------------------------------------------------------------
/security_config_example.yaml:
--------------------------------------------------------------------------------
```yaml
1 | # AWS MCP Server Security Configuration Example
2 | # Place this file at a location specified by AWS_MCP_SECURITY_CONFIG environment variable
3 |
4 | # ---------------------------------------------------------------------------------
5 | # 🔒 Security Rules Overview 🔒
6 | # ---------------------------------------------------------------------------------
7 | # The AWS MCP Server security system uses three layers of protection:
8 | #
9 | # 1. DANGEROUS_COMMANDS: Block specific commands that could compromise security
10 | # or lead to account takeover, privilege escalation, or audit tampering
11 | #
12 | # 2. SAFE_PATTERNS: Allow read-only and explicitly safe operations that
13 | # match dangerous patterns but are needed for normal operation
14 | #
15 | # 3. REGEX_RULES: Complex pattern matching for security risks that can't
16 | # be captured by simple command patterns
17 | #
18 | # How the layers work together:
19 | # - First, the system checks if a command matches any dangerous pattern
20 | # - If it does, the system then checks if it matches any safe pattern
21 | # - If it matches a safe pattern, it's allowed despite being dangerous
22 | # - Finally, the command is checked against all regex rules
23 | # - Any match with a regex rule will block the command, regardless of other checks
24 | #
25 | # Security Mode:
26 | # - Set AWS_MCP_SECURITY_MODE=strict (default) to enforce all rules
27 | # - Set AWS_MCP_SECURITY_MODE=permissive to log warnings but allow execution
28 | # ---------------------------------------------------------------------------------
29 |
30 | # ---------------------------------------------------------------------------------
31 | # 🔑 Identity and Access Control Security Rules
32 | # ---------------------------------------------------------------------------------
33 | # These rules focus on preventing identity-based attacks such as:
34 | # - Account takeover via creation of unauthorized users/credentials
35 | # - Privilege escalation by attaching permissive policies
36 | # - Credential exposure through access key creation
37 | # - Console password creation and MFA device manipulation
38 | # ---------------------------------------------------------------------------------
39 |
40 | # Commands considered dangerous by security category
41 | # Keys are AWS service names, values are lists of command prefixes to block
42 | dangerous_commands:
43 | # Identity and Access Management - core of security
44 | iam:
45 | # User management (potential backdoor accounts)
46 | - "aws iam create-user" # Creates new IAM users that could persist after compromise
47 | - "aws iam update-user" # Updates existing user properties
48 |
49 | # Credential management (theft risk)
50 | - "aws iam create-access-key" # Creates long-term credentials that can be exfiltrated
51 | - "aws iam update-access-key" # Changes status of access keys (enabling/disabling)
52 | - "aws iam create-login-profile" # Creates console passwords for existing users
53 | - "aws iam update-login-profile" # Updates console passwords
54 |
55 | # Authentication controls
56 | - "aws iam create-virtual-mfa-device" # Creates new MFA devices
57 | - "aws iam deactivate-mfa-device" # Removes MFA protection from accounts
58 | - "aws iam delete-virtual-mfa-device" # Deletes MFA devices
59 | - "aws iam enable-mfa-device" # Enables/associates MFA devices
60 |
61 | # Privilege escalation via policy manipulation
62 | - "aws iam attach-user-policy" # Attaches managed policies to users
63 | - "aws iam attach-role-policy" # Attaches managed policies to roles
64 | - "aws iam attach-group-policy" # Attaches managed policies to groups
65 | - "aws iam create-policy" # Creates new managed policies
66 | - "aws iam create-policy-version" # Creates new versions of managed policies
67 | - "aws iam set-default-policy-version" # Changes active policy version
68 |
69 | # Inline policy manipulation (harder to detect)
70 | - "aws iam put-user-policy" # Creates/updates inline policies for users
71 | - "aws iam put-role-policy" # Creates/updates inline policies for roles
72 | - "aws iam put-group-policy" # Creates/updates inline policies for groups
73 |
74 | # Trust relationship manipulation
75 | - "aws iam update-assume-role-policy" # Changes who can assume a role
76 | - "aws iam update-role" # Updates role properties
77 |
78 | # Security Token Service - temporary credentials
79 | sts:
80 | - "aws sts assume-role" # Assumes roles with potentially higher privileges
81 | - "aws sts get-federation-token" # Gets federated access tokens
82 |
83 | # AWS Organizations - multi-account management
84 | organizations:
85 | - "aws organizations create-account" # Creates new AWS accounts
86 | - "aws organizations invite-account-to-organization" # Brings accounts under org control
87 | - "aws organizations leave-organization" # Removes accounts from organization
88 | - "aws organizations remove-account-from-organization" # Removes accounts from organization
89 | - "aws organizations disable-policy-type" # Disables policy enforcement
90 | - "aws organizations create-policy" # Creates organization policies
91 | - "aws organizations attach-policy" # Attaches organization policies
92 |
93 | # ---------------------------------------------------------------------------------
94 | # 🔍 Audit and Logging Security Rules
95 | # ---------------------------------------------------------------------------------
96 | # These rules prevent attackers from covering their tracks by:
97 | # - Disabling or deleting audit logs (CloudTrail)
98 | # - Turning off compliance monitoring (Config)
99 | # - Disabling threat detection (GuardDuty)
100 | # - Removing alarm systems (CloudWatch)
101 | # ---------------------------------------------------------------------------------
102 |
103 | # CloudTrail - AWS activity logging
104 | cloudtrail:
105 | - "aws cloudtrail delete-trail" # Removes audit trail completely
106 | - "aws cloudtrail stop-logging" # Stops collecting audit logs
107 | - "aws cloudtrail update-trail" # Modifies logging settings (e.g., disabling logging)
108 | - "aws cloudtrail put-event-selectors" # Changes what events are logged
109 | - "aws cloudtrail delete-event-data-store" # Deletes storage for CloudTrail events
110 |
111 | # AWS Config - configuration monitoring
112 | config:
113 | - "aws configservice delete-configuration-recorder" # Removes configuration tracking
114 | - "aws configservice stop-configuration-recorder" # Stops recording configuration changes
115 | - "aws configservice delete-delivery-channel" # Stops delivering configuration snapshots
116 | - "aws configservice delete-remediation-configuration" # Removes auto-remediation
117 |
118 | # GuardDuty - threat detection
119 | guardduty:
120 | - "aws guardduty delete-detector" # Disables threat detection completely
121 | - "aws guardduty disable-organization-admin-account" # Disables central security
122 | - "aws guardduty update-detector" # Modifies threat detection settings
123 |
124 | # CloudWatch - monitoring and alerting
125 | cloudwatch:
126 | - "aws cloudwatch delete-alarms" # Removes security alarm configurations
127 | - "aws cloudwatch disable-alarm-actions" # Disables alarm action triggers
128 | - "aws cloudwatch delete-dashboards" # Removes monitoring dashboards
129 |
130 | # ---------------------------------------------------------------------------------
131 | # 🔐 Data Security Rules
132 | # ---------------------------------------------------------------------------------
133 | # These rules prevent data exposure through:
134 | # - Secret and encryption key management
135 | # - Storage bucket permission controls
136 | # - Encryption settings management
137 | # ---------------------------------------------------------------------------------
138 |
139 | # Secrets Manager - sensitive credential storage
140 | secretsmanager:
141 | - "aws secretsmanager put-secret-value" # Changes stored secrets
142 | - "aws secretsmanager update-secret" # Updates secret properties
143 | - "aws secretsmanager restore-secret" # Restores deleted secrets
144 | - "aws secretsmanager delete-secret" # Removes sensitive secrets
145 |
146 | # KMS - encryption key management
147 | kms:
148 | - "aws kms disable-key" # Disables encryption keys
149 | - "aws kms delete-alias" # Removes key aliases
150 | - "aws kms schedule-key-deletion" # Schedules deletion of encryption keys
151 | - "aws kms cancel-key-deletion" # Cancels pending key deletion
152 | - "aws kms revoke-grant" # Revokes permissions to use keys
153 |
154 | # S3 - object storage security
155 | s3:
156 | - "aws s3api put-bucket-policy" # Changes bucket permissions
157 | - "aws s3api put-bucket-acl" # Changes bucket access controls
158 | - "aws s3api delete-bucket-policy" # Removes bucket protection policies
159 | - "aws s3api delete-bucket-encryption" # Removes encryption settings
160 | - "aws s3api put-public-access-block" # Changes public access settings
161 |
162 | # ---------------------------------------------------------------------------------
163 | # 🌐 Network Security Rules
164 | # ---------------------------------------------------------------------------------
165 | # These rules prevent network-based attacks through:
166 | # - Security group modification (firewall rules)
167 | # - Network ACL changes
168 | # - VPC endpoint manipulation
169 | # ---------------------------------------------------------------------------------
170 |
171 | # EC2 network security
172 | ec2:
173 | - "aws ec2 authorize-security-group-ingress" # Opens inbound network access
174 | - "aws ec2 authorize-security-group-egress" # Opens outbound network access
175 | - "aws ec2 revoke-security-group-ingress" # Removes inbound security rules
176 | - "aws ec2 revoke-security-group-egress" # Removes outbound security rules
177 | - "aws ec2 modify-vpc-endpoint" # Changes VPC endpoint settings
178 | - "aws ec2 create-flow-logs" # Creates network flow logs
179 | - "aws ec2 delete-flow-logs" # Removes network flow logs
180 | - "aws ec2 modify-instance-attribute" # Changes security attributes of instances
181 |
182 | # ---------------------------------------------------------------------------------
183 | # ✓ Safe Patterns
184 | # ---------------------------------------------------------------------------------
185 | # These patterns explicitly allow read-only operations that don't modify resources
186 | # and pose minimal or no security risk, even if they match dangerous patterns.
187 | # ---------------------------------------------------------------------------------
188 |
189 | # Safe patterns that override dangerous commands
190 | safe_patterns:
191 | # Universal safe patterns for any service
192 | general:
193 | - "--help" # Getting command help documentation
194 | - "help" # Getting command help documentation
195 | - "--version" # Checking AWS CLI version
196 | - "--dry-run" # Testing without making changes
197 | - "--generate-cli-skeleton" # Generating skeleton templates
198 |
199 | # Read-only IAM operations
200 | iam:
201 | - "aws iam get-" # All get operations (reading resources)
202 | - "aws iam list-" # All list operations (listing resources)
203 | - "aws iam generate-" # Report generation
204 | - "aws iam simulate-" # Policy simulation (no changes)
205 | - "aws iam tag-" # Adding organizational tags is generally safe
206 |
207 | # Read-only STS operations
208 | sts:
209 | - "aws sts get-caller-identity" # Checking current identity
210 | - "aws sts decode-authorization-message" # Decoding error messages
211 |
212 | # Read-only Organizations operations
213 | organizations:
214 | - "aws organizations describe-" # Reading organization details
215 | - "aws organizations list-" # Listing organization resources
216 |
217 | # Read-only CloudTrail operations
218 | cloudtrail:
219 | - "aws cloudtrail describe-" # Reading trail configurations
220 | - "aws cloudtrail get-" # Getting trail settings
221 | - "aws cloudtrail list-" # Listing trails/events
222 | - "aws cloudtrail lookup-events" # Searching audit events
223 |
224 | # Read-only AWS Config operations
225 | config:
226 | - "aws configservice describe-" # Reading configuration details
227 | - "aws configservice get-" # Getting configuration settings
228 | - "aws configservice list-" # Listing configuration resources
229 | - "aws configservice select-resource-config" # Querying resources
230 |
231 | # Read-only GuardDuty operations
232 | guardduty:
233 | - "aws guardduty describe-" # Reading detector configurations
234 | - "aws guardduty get-" # Getting detector settings/findings
235 | - "aws guardduty list-" # Listing detectors/findings
236 |
237 | # Read-only CloudWatch operations
238 | cloudwatch:
239 | - "aws cloudwatch describe-" # Reading alarm configurations
240 | - "aws cloudwatch get-" # Getting metric data
241 | - "aws cloudwatch list-" # Listing metrics/alarms
242 |
243 | # Read-only Secrets Manager operations
244 | secretsmanager:
245 | - "aws secretsmanager list-" # Listing secrets (metadata only)
246 | - "aws secretsmanager describe-" # Reading metadata about secrets
247 |
248 | # Read-only KMS operations
249 | kms:
250 | - "aws kms describe-" # Reading key details
251 | - "aws kms get-" # Getting key settings
252 | - "aws kms list-" # Listing keys and aliases
253 |
254 | # Read-only S3 operations
255 | s3:
256 | - "aws s3 ls" # Listing buckets/objects
257 | - "aws s3api get-" # Getting bucket settings/objects
258 | - "aws s3api list-" # Listing buckets/objects
259 | - "aws s3api head-" # Getting object metadata
260 |
261 | # Read-only EC2 network operations
262 | ec2:
263 | - "aws ec2 describe-" # Reading network configurations
264 | - "aws ec2 get-" # Getting network settings
265 |
266 | # ---------------------------------------------------------------------------------
267 | # 🔎 Regex Pattern Rules
268 | # ---------------------------------------------------------------------------------
269 | # These complex patterns detect security risks that can't be caught with simple
270 | # command prefix matching. They use regular expressions to identify risky
271 | # command patterns that could compromise security.
272 | # ---------------------------------------------------------------------------------
273 |
274 | # Complex pattern matching using regular expressions
275 | regex_rules:
276 | # Global security patterns (apply to all services)
277 | general:
278 | # Identity and authentication risks
279 | - pattern: "aws .* --profile\\s+(root|admin|administrator)"
280 | description: "Prevent use of sensitive profiles"
281 | error_message: "Using sensitive profiles (root, admin) is restricted for security reasons."
282 |
283 | # Protocol security risks
284 | - pattern: "aws .* --no-verify-ssl"
285 | description: "Prevent disabling SSL verification"
286 | error_message: "Disabling SSL verification is not allowed for security reasons."
287 |
288 | # Data exposure risks
289 | - pattern: "aws .* --output\\s+text\\s+.*--query\\s+.*Password"
290 | description: "Prevent password exposure in text output"
291 | error_message: "Outputting sensitive data like passwords in text format is restricted."
292 |
293 | # Debug mode risks
294 | - pattern: "aws .* --debug"
295 | description: "Prevent debug mode which shows sensitive info"
296 | error_message: "Debug mode is restricted as it may expose sensitive information."
297 |
298 | # IAM-specific security patterns
299 | iam:
300 | # Privileged user creation
301 | - pattern: "aws iam create-user.*--user-name\\s+(root|admin|administrator|backup|security|finance|billing)"
302 | description: "Prevent creation of privileged-sounding users"
303 | error_message: "Creating users with sensitive names is restricted for security reasons."
304 |
305 | # Privilege escalation via policies
306 | - pattern: "aws iam attach-user-policy.*--policy-arn\\s+.*Administrator"
307 | description: "Prevent attaching Administrator policies"
308 | error_message: "Attaching Administrator policies is restricted for security reasons."
309 |
310 | - pattern: "aws iam attach-user-policy.*--policy-arn\\s+.*FullAccess"
311 | description: "Prevent attaching FullAccess policies to users"
312 | error_message: "Attaching FullAccess policies directly to users is restricted (use roles instead)."
313 |
314 | # Unrestricted permissions in policies
315 | - pattern: "aws iam create-policy.*\"Effect\":\\s*\"Allow\".*\"Action\":\\s*\"\*\".*\"Resource\":\\s*\"\*\""
316 | description: "Prevent creation of policies with * permissions"
317 | error_message: "Creating policies with unrestricted (*) permissions is not allowed."
318 |
319 | # Password policy weakening
320 | - pattern: "aws iam create-login-profile.*--password-reset-required\\s+false"
321 | description: "Enforce password reset for new profiles"
322 | error_message: "Creating login profiles without requiring password reset is restricted."
323 |
324 | - pattern: "aws iam update-account-password-policy.*--require-uppercase-characters\\s+false"
325 | description: "Prevent weakening password policies"
326 | error_message: "Weakening account password policies is restricted."
327 |
328 | # S3 security patterns
329 | s3:
330 | # Public bucket exposure
331 | - pattern: "aws s3api put-bucket-policy.*\"Effect\":\\s*\"Allow\".*\"Principal\":\\s*\"\*\""
332 | description: "Prevent public bucket policies"
333 | error_message: "Creating public bucket policies is restricted for security reasons."
334 |
335 | # Disabling public access blocks
336 | - pattern: "aws s3api put-public-access-block.*--public-access-block-configuration\\s+.*\"BlockPublicAcls\":\\s*false"
337 | description: "Prevent disabling public access blocks"
338 | error_message: "Disabling S3 public access blocks is restricted for security reasons."
339 |
340 | # Public bucket creation outside approved regions
341 | - pattern: "aws s3api create-bucket.*--region\\s+(?!eu|us-east-1).*--acl\\s+public"
342 | description: "Prevent public buckets outside of allowed regions"
343 | error_message: "Creating public buckets outside allowed regions is restricted."
344 |
345 | # EC2 network security patterns
346 | ec2:
347 | # Open security groups for sensitive ports
348 | - pattern: "aws ec2 authorize-security-group-ingress.*--cidr\\s+0\\.0\\.0\\.0/0.*--port\\s+(?!80|443)[0-9]+"
349 | description: "Prevent open security groups for non-web ports"
350 | error_message: "Opening non-web ports to the entire internet (0.0.0.0/0) is restricted."
351 |
352 | # Unsafe user-data scripts
353 | - pattern: "aws ec2 run-instances.*--user-data\\s+.*curl.*\\|.*sh"
354 | description: "Detect potentially unsafe user-data scripts"
355 | error_message: "Running scripts from remote sources in user-data presents security risks."
356 |
357 | # CloudTrail integrity patterns
358 | cloudtrail:
359 | # Disabling global event logging
360 | - pattern: "aws cloudtrail update-trail.*--no-include-global-service-events"
361 | description: "Prevent disabling global event logging"
362 | error_message: "Disabling CloudTrail logging for global service events is restricted."
363 |
364 | # Making trails single-region
365 | - pattern: "aws cloudtrail update-trail.*--no-multi-region"
366 | description: "Prevent making trails single-region"
367 | error_message: "Changing CloudTrail trails from multi-region to single-region is restricted."
```
--------------------------------------------------------------------------------
/tests/unit/test_cli_executor.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the CLI executor module."""
2 |
3 | import asyncio
4 | from unittest.mock import AsyncMock, MagicMock, patch
5 |
6 | import pytest
7 |
8 | from aws_mcp_server.cli_executor import (
9 | CommandExecutionError,
10 | CommandValidationError,
11 | check_aws_cli_installed,
12 | execute_aws_command,
13 | execute_pipe_command,
14 | get_command_help,
15 | is_auth_error,
16 | )
17 | from aws_mcp_server.config import DEFAULT_TIMEOUT, MAX_OUTPUT_SIZE
18 |
19 |
20 | @pytest.mark.asyncio
21 | async def test_execute_aws_command_success():
22 | """Test successful command execution."""
23 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
24 | # Mock a successful process
25 | process_mock = AsyncMock()
26 | process_mock.returncode = 0
27 | process_mock.communicate.return_value = (b"Success output", b"")
28 | mock_subprocess.return_value = process_mock
29 |
30 | result = await execute_aws_command("aws s3 ls")
31 |
32 | assert result["status"] == "success"
33 | assert result["output"] == "Success output"
34 | mock_subprocess.assert_called_once_with("aws", "s3", "ls", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
35 |
36 |
37 | @pytest.mark.asyncio
38 | async def test_execute_aws_command_ec2_with_region_added():
39 | """Test that region is automatically added to EC2 commands."""
40 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
41 | # Mock a successful process
42 | process_mock = AsyncMock()
43 | process_mock.returncode = 0
44 | process_mock.communicate.return_value = (b"EC2 instances", b"")
45 | mock_subprocess.return_value = process_mock
46 |
47 | # Import here to ensure the test uses the actual value
48 | from aws_mcp_server.config import AWS_REGION
49 |
50 | # Execute an EC2 command without region
51 | result = await execute_aws_command("aws ec2 describe-instances")
52 |
53 | assert result["status"] == "success"
54 | assert result["output"] == "EC2 instances"
55 |
56 | # Verify region was added to the command
57 | mock_subprocess.assert_called_once()
58 | call_args = mock_subprocess.call_args[0]
59 | assert call_args[0] == "aws"
60 | assert call_args[1] == "ec2"
61 | assert call_args[2] == "describe-instances"
62 | assert "--region" in call_args
63 | assert AWS_REGION in call_args
64 |
65 |
66 | @pytest.mark.asyncio
67 | async def test_execute_aws_command_with_custom_timeout():
68 | """Test command execution with custom timeout."""
69 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
70 | process_mock = AsyncMock()
71 | process_mock.returncode = 0
72 | process_mock.communicate.return_value = (b"Success output", b"")
73 | mock_subprocess.return_value = process_mock
74 |
75 | # Use a custom timeout
76 | custom_timeout = 120
77 | with patch("asyncio.wait_for") as mock_wait_for:
78 | mock_wait_for.return_value = (b"Success output", b"")
79 | await execute_aws_command("aws s3 ls", timeout=custom_timeout)
80 |
81 | # Check that wait_for was called with the custom timeout
82 | mock_wait_for.assert_called_once()
83 | args, kwargs = mock_wait_for.call_args
84 | assert kwargs.get("timeout") == custom_timeout or args[1] == custom_timeout
85 |
86 |
87 | @pytest.mark.asyncio
88 | async def test_execute_aws_command_error():
89 | """Test command execution error."""
90 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
91 | # Mock a failed process
92 | process_mock = AsyncMock()
93 | process_mock.returncode = 1
94 | # Set up an awaitable communicate method
95 | communicate_mock = AsyncMock()
96 | communicate_mock.return_value = (b"", b"Error message")
97 | process_mock.communicate = communicate_mock
98 | mock_subprocess.return_value = process_mock
99 |
100 | result = await execute_aws_command("aws s3 ls")
101 |
102 | assert result["status"] == "error"
103 | assert result["output"] == "Error message"
104 | # Verify communicate was called
105 | communicate_mock.assert_called_once()
106 |
107 |
108 | @pytest.mark.asyncio
109 | async def test_execute_aws_command_auth_error():
110 | """Test command execution with authentication error."""
111 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
112 | # Mock a process that returns auth error
113 | process_mock = AsyncMock()
114 | process_mock.returncode = 1
115 | process_mock.communicate.return_value = (b"", b"Unable to locate credentials")
116 | mock_subprocess.return_value = process_mock
117 |
118 | result = await execute_aws_command("aws s3 ls")
119 |
120 | assert result["status"] == "error"
121 | assert "Authentication error" in result["output"]
122 | assert "Unable to locate credentials" in result["output"]
123 | assert "Please check your AWS credentials" in result["output"]
124 |
125 |
126 | @pytest.mark.asyncio
127 | async def test_execute_aws_command_timeout():
128 | """Test command timeout."""
129 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
130 | # Mock a process that times out
131 | process_mock = AsyncMock()
132 | # Use a properly awaitable mock that raises TimeoutError
133 | communicate_mock = AsyncMock(side_effect=asyncio.TimeoutError())
134 | process_mock.communicate = communicate_mock
135 | mock_subprocess.return_value = process_mock
136 |
137 | # Mock a regular function instead of an async one for process.kill
138 | process_mock.kill = MagicMock()
139 |
140 | with pytest.raises(CommandExecutionError) as excinfo:
141 | await execute_aws_command("aws s3 ls", timeout=1)
142 |
143 | # Check error message
144 | assert "Command timed out after 1 seconds" in str(excinfo.value)
145 |
146 | # Verify process was killed
147 | process_mock.kill.assert_called_once()
148 |
149 |
150 | @pytest.mark.asyncio
151 | async def test_execute_aws_command_kill_failure():
152 | """Test failure to kill process after timeout."""
153 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
154 | # Mock a process that times out
155 | process_mock = AsyncMock()
156 | # Use a properly awaitable mock that raises TimeoutError
157 | communicate_mock = AsyncMock(side_effect=asyncio.TimeoutError())
158 | process_mock.communicate = communicate_mock
159 | # Use regular MagicMock since kill() is not an async method
160 | process_mock.kill = MagicMock(side_effect=Exception("Failed to kill process"))
161 | mock_subprocess.return_value = process_mock
162 |
163 | with pytest.raises(CommandExecutionError) as excinfo:
164 | await execute_aws_command("aws s3 ls", timeout=1)
165 |
166 | # The main exception should still be about the timeout
167 | assert "Command timed out after 1 seconds" in str(excinfo.value)
168 |
169 |
170 | @pytest.mark.asyncio
171 | async def test_execute_aws_command_general_exception():
172 | """Test handling of general exceptions during command execution."""
173 | with patch("asyncio.create_subprocess_exec", side_effect=Exception("Test exception")):
174 | with pytest.raises(CommandExecutionError) as excinfo:
175 | await execute_aws_command("aws s3 ls")
176 |
177 | assert "Failed to execute command" in str(excinfo.value)
178 | assert "Test exception" in str(excinfo.value)
179 |
180 |
181 | @pytest.mark.asyncio
182 | async def test_execute_aws_command_truncate_output():
183 | """Test truncation of large outputs."""
184 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
185 | # Mock a successful process with large output
186 | process_mock = AsyncMock()
187 | process_mock.returncode = 0
188 |
189 | # Generate a large output that exceeds MAX_OUTPUT_SIZE
190 | large_output = "x" * (MAX_OUTPUT_SIZE + 1000)
191 | process_mock.communicate.return_value = (large_output.encode("utf-8"), b"")
192 | mock_subprocess.return_value = process_mock
193 |
194 | result = await execute_aws_command("aws s3 ls")
195 |
196 | assert result["status"] == "success"
197 | assert len(result["output"]) <= MAX_OUTPUT_SIZE + 100 # Allow for the truncation message
198 | assert "output truncated" in result["output"]
199 |
200 |
201 | @pytest.mark.parametrize(
202 | "error_message,expected_result",
203 | [
204 | # Positive cases
205 | ("Unable to locate credentials", True),
206 | ("Some text before ExpiredToken and after", True),
207 | ("Error: AccessDenied when attempting to perform operation", True),
208 | ("AuthFailure: credentials could not be verified", True),
209 | ("The security token included in the request is invalid", True),
210 | ("The config profile could not be found", True),
211 | # Negative cases
212 | ("S3 bucket not found", False),
213 | ("Resource not found: myresource", False),
214 | ("Invalid parameter value", False),
215 | ],
216 | )
217 | def test_is_auth_error(error_message, expected_result):
218 | """Test the is_auth_error function with various error messages."""
219 | assert is_auth_error(error_message) == expected_result
220 |
221 |
222 | @pytest.mark.asyncio
223 | @pytest.mark.parametrize(
224 | "returncode,stdout,stderr,exception,expected_result",
225 | [
226 | # CLI installed
227 | (0, b"aws-cli/2.15.0", b"", None, True),
228 | # CLI not installed - command not found
229 | (127, b"", b"command not found", None, False),
230 | # CLI error case
231 | (1, b"", b"some error", None, False),
232 | # Exception during command execution
233 | (None, None, None, Exception("Test exception"), False),
234 | ],
235 | )
236 | async def test_check_aws_cli_installed(returncode, stdout, stderr, exception, expected_result):
237 | """Test check_aws_cli_installed function with various scenarios."""
238 | if exception:
239 | with patch("asyncio.create_subprocess_exec", side_effect=exception):
240 | result = await check_aws_cli_installed()
241 | assert result is expected_result
242 | else:
243 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
244 | process_mock = AsyncMock()
245 | process_mock.returncode = returncode
246 | process_mock.communicate.return_value = (stdout, stderr)
247 | mock_subprocess.return_value = process_mock
248 |
249 | result = await check_aws_cli_installed()
250 | assert result is expected_result
251 |
252 | if returncode == 0: # Only verify call args for success case to avoid redundancy
253 | mock_subprocess.assert_called_once_with("aws", "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
254 |
255 |
256 | @pytest.mark.asyncio
257 | @pytest.mark.parametrize(
258 | "service,command,mock_type,mock_value,expected_text,expected_call",
259 | [
260 | # Successful help retrieval with service and command
261 | ("s3", "ls", "return_value", {"status": "success", "output": "Help text"}, "Help text", "aws s3 ls help"),
262 | # Successful help retrieval with service only
263 | ("s3", None, "return_value", {"status": "success", "output": "Help text for service"}, "Help text for service", "aws s3 help"),
264 | # Error scenarios
265 | ("s3", "ls", "side_effect", CommandValidationError("Test validation error"), "Command validation error: Test validation error", None),
266 | ("s3", "ls", "side_effect", CommandExecutionError("Test execution error"), "Error retrieving help: Test execution error", None),
267 | ("s3", "ls", "side_effect", Exception("Test exception"), "Error retrieving help: Test exception", None),
268 | # Error result from AWS command
269 | ("s3", "ls", "return_value", {"status": "error", "output": "Command failed"}, "Error: Command failed", "aws s3 ls help"),
270 | ],
271 | )
272 | async def test_get_command_help(service, command, mock_type, mock_value, expected_text, expected_call):
273 | """Test get_command_help function with various scenarios."""
274 | with patch("aws_mcp_server.cli_executor.execute_aws_command", new_callable=AsyncMock) as mock_execute:
275 | # Configure the mock based on the test case
276 | if mock_type == "return_value":
277 | mock_execute.return_value = mock_value
278 | else: # side_effect
279 | mock_execute.side_effect = mock_value
280 |
281 | # Call the function
282 | result = await get_command_help(service, command)
283 |
284 | # Verify the result
285 | assert expected_text in result["help_text"]
286 |
287 | # Verify the mock was called correctly if expected_call is provided
288 | if expected_call:
289 | mock_execute.assert_called_once_with(expected_call)
290 |
291 |
292 | @pytest.mark.asyncio
293 | async def test_execute_aws_command_with_pipe():
294 | """Test execute_aws_command with a piped command."""
295 | # Test that execute_aws_command calls execute_pipe_command for piped commands
296 | with patch("aws_mcp_server.cli_executor.is_pipe_command", return_value=True):
297 | with patch("aws_mcp_server.cli_executor.execute_pipe_command", new_callable=AsyncMock) as mock_pipe_exec:
298 | mock_pipe_exec.return_value = {"status": "success", "output": "Piped result"}
299 |
300 | result = await execute_aws_command("aws s3 ls | grep bucket")
301 |
302 | assert result["status"] == "success"
303 | assert result["output"] == "Piped result"
304 | mock_pipe_exec.assert_called_once_with("aws s3 ls | grep bucket", None)
305 |
306 |
307 | @pytest.mark.asyncio
308 | async def test_execute_pipe_command_success():
309 | """Test successful execution of a pipe command."""
310 | with patch("aws_mcp_server.cli_executor.validate_pipe_command") as mock_validate:
311 | with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_pipe_exec:
312 | mock_pipe_exec.return_value = {"status": "success", "output": "Filtered results"}
313 |
314 | result = await execute_pipe_command("aws s3 ls | grep bucket")
315 |
316 | assert result["status"] == "success"
317 | assert result["output"] == "Filtered results"
318 | mock_validate.assert_called_once_with("aws s3 ls | grep bucket")
319 | mock_pipe_exec.assert_called_once_with("aws s3 ls | grep bucket", None)
320 |
321 |
322 | @pytest.mark.asyncio
323 | async def test_execute_pipe_command_ec2_with_region_added():
324 | """Test that region is automatically added to EC2 commands in a pipe."""
325 | with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
326 | with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_pipe_exec:
327 | mock_pipe_exec.return_value = {"status": "success", "output": "Filtered EC2 instances"}
328 |
329 | # Mock split_pipe_command to simulate pipe command splitting
330 | with patch("aws_mcp_server.cli_executor.split_pipe_command") as mock_split:
331 | mock_split.return_value = ["aws ec2 describe-instances", "grep instance-id"]
332 |
333 | # Import here to ensure the test uses the actual value
334 | from aws_mcp_server.config import AWS_REGION
335 |
336 | # Execute a piped EC2 command without region
337 | result = await execute_pipe_command("aws ec2 describe-instances | grep instance-id")
338 |
339 | assert result["status"] == "success"
340 | assert result["output"] == "Filtered EC2 instances"
341 |
342 | # Verify the command was modified to include region
343 | expected_cmd = f"aws ec2 describe-instances --region {AWS_REGION} | grep instance-id"
344 | mock_pipe_exec.assert_called_once_with(expected_cmd, None)
345 |
346 |
347 | @pytest.mark.asyncio
348 | async def test_execute_pipe_command_validation_error():
349 | """Test execute_pipe_command with validation error."""
350 | with patch("aws_mcp_server.cli_executor.validate_pipe_command", side_effect=CommandValidationError("Invalid pipe command")):
351 | with pytest.raises(CommandValidationError) as excinfo:
352 | await execute_pipe_command("invalid | pipe | command")
353 |
354 | assert "Invalid pipe command" in str(excinfo.value)
355 |
356 |
357 | @pytest.mark.asyncio
358 | async def test_execute_pipe_command_execution_error():
359 | """Test execute_pipe_command with execution error."""
360 | with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
361 | with patch("aws_mcp_server.cli_executor.execute_piped_command", side_effect=Exception("Execution error")):
362 | with pytest.raises(CommandExecutionError) as excinfo:
363 | await execute_pipe_command("aws s3 ls | grep bucket")
364 |
365 | assert "Failed to execute piped command" in str(excinfo.value)
366 | assert "Execution error" in str(excinfo.value)
367 |
368 |
369 | # New test cases to improve coverage
370 |
371 |
372 | @pytest.mark.asyncio
373 | async def test_execute_pipe_command_timeout():
374 | """Test timeout handling in piped commands."""
375 | with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
376 | with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_exec:
377 | # Simulate timeout in the executed command
378 | mock_exec.return_value = {"status": "error", "output": f"Command timed out after {DEFAULT_TIMEOUT} seconds"}
379 |
380 | result = await execute_pipe_command("aws s3 ls | grep bucket")
381 |
382 | assert result["status"] == "error"
383 | assert f"Command timed out after {DEFAULT_TIMEOUT} seconds" in result["output"]
384 | mock_exec.assert_called_once()
385 |
386 |
387 | @pytest.mark.asyncio
388 | async def test_execute_pipe_command_with_custom_timeout():
389 | """Test piped command execution with custom timeout."""
390 | with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
391 | with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_exec:
392 | mock_exec.return_value = {"status": "success", "output": "Piped output"}
393 |
394 | custom_timeout = 120
395 | await execute_pipe_command("aws s3 ls | grep bucket", timeout=custom_timeout)
396 |
397 | # Verify the custom timeout was passed to the execute_piped_command
398 | mock_exec.assert_called_once_with("aws s3 ls | grep bucket", custom_timeout)
399 |
400 |
401 | @pytest.mark.asyncio
402 | async def test_execute_pipe_command_large_output():
403 | """Test handling of large output in piped commands."""
404 | with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
405 | with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_exec:
406 | # Generate large output that would be truncated
407 | large_output = "x" * (MAX_OUTPUT_SIZE + 1000)
408 | mock_exec.return_value = {"status": "success", "output": large_output}
409 |
410 | result = await execute_pipe_command("aws s3 ls | grep bucket")
411 |
412 | assert result["status"] == "success"
413 | assert len(result["output"]) == len(large_output) # Length should be preserved here as truncation happens in tools module
414 |
415 |
416 | @pytest.mark.parametrize(
417 | "exit_code,stderr,expected_status,expected_msg",
418 | [
419 | (0, b"", "success", ""), # Success case
420 | (1, b"Error: bucket not found", "error", "Error: bucket not found"), # Standard error
421 | (1, b"AccessDenied", "error", "Authentication error"), # Auth error
422 | (0, b"Warning: deprecated feature", "success", ""), # Warning on stderr but success exit code
423 | ],
424 | )
425 | @pytest.mark.asyncio
426 | async def test_execute_aws_command_exit_codes(exit_code, stderr, expected_status, expected_msg):
427 | """Test handling of different process exit codes and stderr output."""
428 | with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
429 | process_mock = AsyncMock()
430 | process_mock.returncode = exit_code
431 | stdout = b"Command output" if exit_code == 0 else b""
432 | process_mock.communicate.return_value = (stdout, stderr)
433 | mock_subprocess.return_value = process_mock
434 |
435 | result = await execute_aws_command("aws s3 ls")
436 |
437 | assert result["status"] == expected_status
438 | if expected_status == "success":
439 | assert result["output"] == "Command output"
440 | else:
441 | assert expected_msg in result["output"]
442 |
```
--------------------------------------------------------------------------------
/spec.md:
--------------------------------------------------------------------------------
```markdown
1 | # AWS Model Context Protocol (MCP) Server Specification
2 |
3 | ## Project Overview
4 |
5 | The **AWS MCP Server** is a lightweight service that enables users to execute AWS CLI commands through an MCP (Model Context Protocol) interface. It integrates with MCP-aware AI assistants (e.g., Claude Desktop, Cursor, Windsurf) via the [Model Context Protocol](https://modelcontextprotocol.io/), which is based on JSON-RPC 2.0. The server facilitates AWS CLI command documentation and execution, returning human-readable output optimized for AI consumption.
6 |
7 | ### Key Objectives
8 |
9 | - **Command Documentation**: Provide detailed help information for AWS CLI commands.
10 | - **Command Execution**: Execute AWS CLI commands and return formatted results.
11 | - **MCP Compliance**: Fully implement the standard MCP protocol.
12 | - **Human-Readable Output**: Ensure command output is optimized for AI assistants.
13 | - **AWS Resource Context**: Provide access to AWS resources like profiles and regions.
14 | - **Easy Deployment**: Prioritize Docker-based deployment for environment consistency.
15 | - **Open Source**: Release under MIT license with GitHub repository and CI/CD.
16 |
17 | ## Core Features
18 |
19 | ### 1. Command Documentation Tool
20 |
21 | The `describe_command` tool retrieves and formats AWS CLI help information:
22 |
23 | - Use `aws help` and `aws <service> help` to access documentation.
24 | - Present results in a structured, readable format optimized for AI consumption.
25 | - Support parameter exploration to help understand command options.
26 |
27 | **Examples:**
28 |
29 | ```
30 | describe_command({"service": "s3"})
31 | // Returns high-level AWS S3 service documentation
32 |
33 | describe_command({"service": "s3", "command": "ls"})
34 | // Returns specific documentation for the S3 ls command
35 | ```
36 |
37 | ### 2. Command Execution Tool
38 |
39 | The `execute_command` tool runs AWS CLI commands:
40 |
41 | - Accept complete AWS CLI command strings.
42 | - Execute commands using the OS's AWS CLI installation.
43 | - Format output for readability.
44 | - Support optional parameters (timeout).
45 | - Support Unix pipes to filter or transform output.
46 |
47 | **Examples:**
48 |
49 | ```
50 | execute_command({"command": "aws s3 ls"})
51 | // Lists all S3 buckets
52 |
53 | execute_command({"command": "aws ec2 describe-instances --region us-west-2"})
54 | // Lists EC2 instances in the Oregon region
55 |
56 | execute_command({"command": "aws s3api list-buckets --query 'Buckets[*].Name' --output text | sort"})
57 | // Lists bucket names sorted alphabetically
58 | ```
59 |
60 | ### 3. AWS Context Resources
61 |
62 | The server exposes AWS resources through the MCP Resources protocol:
63 |
64 | - **AWS Profiles** (`aws://config/profiles`): Available AWS CLI profiles from AWS config.
65 | - **AWS Regions** (`aws://config/regions`): List of available AWS regions.
66 | - **AWS Region Details** (`aws://config/regions/{region}`): Detailed information about a specific region, including availability zones, geographic location, and services.
67 | - **AWS Environment Variables** (`aws://config/environment`): Current AWS-related environment variables and credential information.
68 | - **AWS Account Information** (`aws://config/account`): Information about the current AWS account.
69 |
70 | These resources provide context for executing AWS commands, allowing AI assistants to suggest region-specific commands, use the correct profile, and understand the current AWS environment.
71 |
72 | ### 4. Output Formatting
73 |
74 | Transform raw AWS CLI output into human-readable formats:
75 |
76 | - Default to AWS CLI's default output format.
77 | - Format complex outputs for better readability.
78 | - Handle JSON, YAML, and text output formats.
79 | - Support truncation for very large outputs.
80 |
81 | ### 5. Authentication Management
82 |
83 | - Leverage existing AWS CLI authentication on the host machine.
84 | - Support AWS profiles through command parameters.
85 | - Provide clear error messages for authentication issues.
86 | - Expose available profiles as MCP Resources.
87 |
88 | ### 6. Prompt Templates
89 |
90 | Provide a collection of useful prompt templates for common AWS use cases:
91 |
92 | - Resource creation with best practices
93 | - Security audits
94 | - Cost optimization
95 | - Resource inventory
96 | - Service troubleshooting
97 | - IAM policy generation
98 | - Service monitoring
99 | - Disaster recovery
100 | - Compliance checking
101 | - Resource cleanup
102 |
103 | ## MCP Protocol Implementation
104 |
105 | The server implements the MCP protocol with the following components:
106 |
107 | ### 1. Initialization Workflow
108 |
109 | **Client Request:**
110 |
111 | ```json
112 | {
113 | "jsonrpc": "2.0",
114 | "id": 1,
115 | "method": "initialize",
116 | "params": {
117 | "protocolVersion": "DRAFT-2025-v1",
118 | "capabilities": {
119 | "experimental": {},
120 | "resources": {}
121 | },
122 | "clientInfo": {
123 | "name": "Claude Desktop",
124 | "version": "1.0.0"
125 | }
126 | }
127 | }
128 | ```
129 |
130 | **Server Response:**
131 | ```json
132 | {
133 | "jsonrpc": "2.0",
134 | "id": 1,
135 | "result": {
136 | "protocolVersion": "DRAFT-2025-v1",
137 | "capabilities": {
138 | "tools": {},
139 | "resources": {}
140 | },
141 | "serverInfo": {
142 | "name": "AWS MCP Server",
143 | "version": "1.0.0"
144 | },
145 | "instructions": "Use this server to retrieve AWS CLI documentation and execute AWS CLI commands."
146 | }
147 | }
148 | ```
149 |
150 | **Client Notification:**
151 | ```json
152 | {
153 | "jsonrpc": "2.0",
154 | "method": "notifications/initialized"
155 | }
156 | ```
157 |
158 | ### 2. Tool Definitions
159 |
160 | The server defines two primary tools:
161 |
162 | #### describe_command
163 |
164 | **Request:**
165 | ```json
166 | {
167 | "jsonrpc": "2.0",
168 | "id": 2,
169 | "method": "tools/describe_command",
170 | "params": {
171 | "service": "s3",
172 | "command": "ls" // Optional
173 | }
174 | }
175 | ```
176 |
177 | **Response:**
178 | ```json
179 | {
180 | "jsonrpc": "2.0",
181 | "id": 2,
182 | "result": {
183 | "help_text": "Description: Lists all your buckets or all the objects in a bucket.\n\nUsage: aws s3 ls [bucket] [options]\n\nOptions:\n --bucket TEXT The bucket name\n --prefix TEXT Prefix to filter objects\n --delimiter TEXT Delimiter to use for grouping\n --max-items INTEGER Maximum number of items to return\n --page-size INTEGER Number of items to return per page\n --starting-token TEXT Starting token for pagination\n --request-payer TEXT Confirms that the requester knows they will be charged for the request\n\nExamples:\n aws s3 ls\n aws s3 ls my-bucket\n aws s3 ls my-bucket --prefix folder/\n"
184 | }
185 | }
186 | ```
187 |
188 | #### execute_command
189 |
190 | **Request:**
191 | ```json
192 | {
193 | "jsonrpc": "2.0",
194 | "id": 3,
195 | "method": "tools/execute_command",
196 | "params": {
197 | "command": "aws s3 ls --region us-west-2",
198 | "timeout": 60 // Optional
199 | }
200 | }
201 | ```
202 |
203 | **Response:**
204 | ```json
205 | {
206 | "jsonrpc": "2.0",
207 | "id": 3,
208 | "result": {
209 | "output": "2023-10-15 14:30:45 my-bucket-1\n2023-11-20 09:15:32 my-bucket-2",
210 | "status": "success"
211 | }
212 | }
213 | ```
214 |
215 | ### 3. Resource Definitions
216 |
217 | The server provides access to AWS resources:
218 |
219 | #### aws_profiles
220 |
221 | **Request:**
222 | ```json
223 | {
224 | "jsonrpc": "2.0",
225 | "id": 4,
226 | "method": "resources/aws_profiles"
227 | }
228 | ```
229 |
230 | **Response:**
231 | ```json
232 | {
233 | "jsonrpc": "2.0",
234 | "id": 4,
235 | "result": {
236 | "profiles": [
237 | { "name": "default", "is_current": true },
238 | { "name": "dev" },
239 | { "name": "prod" }
240 | ]
241 | }
242 | }
243 | ```
244 |
245 | #### aws_regions
246 |
247 | **Request:**
248 | ```json
249 | {
250 | "jsonrpc": "2.0",
251 | "id": 5,
252 | "method": "resources/aws_regions"
253 | }
254 | ```
255 |
256 | **Response:**
257 | ```json
258 | {
259 | "jsonrpc": "2.0",
260 | "id": 5,
261 | "result": {
262 | "regions": [
263 | { "name": "us-east-1", "description": "US East (N. Virginia)", "is_current": true },
264 | { "name": "us-east-2", "description": "US East (Ohio)" },
265 | { "name": "us-west-1", "description": "US West (N. California)" },
266 | { "name": "us-west-2", "description": "US West (Oregon)" }
267 | ]
268 | }
269 | }
270 | ```
271 |
272 | #### aws_region_details
273 |
274 | **Request:**
275 | ```json
276 | {
277 | "jsonrpc": "2.0",
278 | "id": 8,
279 | "method": "resources/aws_region_details",
280 | "params": {
281 | "region": "us-east-1"
282 | }
283 | }
284 | ```
285 |
286 | **Response:**
287 | ```json
288 | {
289 | "jsonrpc": "2.0",
290 | "id": 8,
291 | "result": {
292 | "code": "us-east-1",
293 | "name": "US East (N. Virginia)",
294 | "geographic_location": {
295 | "continent": "North America",
296 | "country": "United States",
297 | "city": "Ashburn, Virginia"
298 | },
299 | "availability_zones": [
300 | {
301 | "name": "us-east-1a",
302 | "state": "available",
303 | "zone_id": "use1-az1",
304 | "zone_type": "availability-zone"
305 | },
306 | {
307 | "name": "us-east-1b",
308 | "state": "available",
309 | "zone_id": "use1-az2",
310 | "zone_type": "availability-zone"
311 | }
312 | ],
313 | "services": ["ec2", "s3", "lambda", "dynamodb", "rds"],
314 | "is_current": true
315 | }
316 | }
317 | ```
318 |
319 | #### aws_environment
320 |
321 | **Request:**
322 | ```json
323 | {
324 | "jsonrpc": "2.0",
325 | "id": 6,
326 | "method": "resources/aws_environment"
327 | }
328 | ```
329 |
330 | **Response:**
331 | ```json
332 | {
333 | "jsonrpc": "2.0",
334 | "id": 6,
335 | "result": {
336 | "aws_profile": "default",
337 | "aws_region": "us-east-1",
338 | "aws_access_key_id": "AKI***********", // Masked for security
339 | "has_credentials": true,
340 | "credentials_source": "environment" // Can be "environment", "profile", "instance-profile", etc.
341 | }
342 | }
343 | ```
344 |
345 | #### aws_account
346 |
347 | **Request:**
348 | ```json
349 | {
350 | "jsonrpc": "2.0",
351 | "id": 7,
352 | "method": "resources/aws_account"
353 | }
354 | ```
355 |
356 | **Response:**
357 | ```json
358 | {
359 | "jsonrpc": "2.0",
360 | "id": 7,
361 | "result": {
362 | "account_id": "123456789012",
363 | "account_alias": "my-org",
364 | "organization_id": "o-abc123"
365 | }
366 | }
367 | ```
368 |
369 | ### 4. Error Handling
370 |
371 | The server returns standardized JSON-RPC error responses:
372 |
373 | ```json
374 | {
375 | "jsonrpc": "2.0",
376 | "id": 3,
377 | "error": {
378 | "code": -32603,
379 | "message": "Internal error",
380 | "data": "AWS CLI command failed: Unable to locate credentials"
381 | }
382 | }
383 | ```
384 |
385 | **Standard Error Codes:**
386 | - `-32600`: Invalid Request
387 | - `-32601`: Method Not Found
388 | - `-32602`: Invalid Parameters
389 | - `-32603`: Internal Error
390 |
391 | ## Architecture
392 |
393 | ### Component Architecture
394 |
395 | ```mermaid
396 | graph TD
397 | Client[MCP Client\nClaude/Cursor] <--> MCP[MCP Interface\nJSON-RPC]
398 | MCP --> Tools[Tool Handler]
399 | MCP --> Resources[Resources Handler]
400 | MCP --> Prompts[Prompt Templates]
401 | Tools --> Executor[AWS CLI Executor]
402 | Resources --> AWS_Config[AWS Config Reader]
403 | Resources --> AWS_STS[AWS STS Client]
404 |
405 | style Client fill:#f9f,stroke:#333,stroke-width:2px
406 | style MCP fill:#bbf,stroke:#333,stroke-width:2px
407 | style Tools fill:#bfb,stroke:#333,stroke-width:2px
408 | style Resources fill:#fbf,stroke:#333,stroke-width:2px
409 | style Prompts fill:#bff,stroke:#333,stroke-width:2px
410 | style Executor fill:#fbb,stroke:#333,stroke-width:2px
411 | style AWS_Config fill:#ffd,stroke:#333,stroke-width:2px
412 | style AWS_STS fill:#dff,stroke:#333,stroke-width:2px
413 | ```
414 |
415 | ### Current Components
416 |
417 | 1. **MCP Interface**
418 | - Implements JSON-RPC 2.0 protocol endpoints
419 | - Handles MCP initialization and notifications
420 | - Routes tool requests to appropriate handlers
421 | - Implemented using FastMCP library
422 |
423 | 2. **Tool Handler**
424 | - Processes `describe_command` requests
425 | - Processes `execute_command` requests
426 | - Validates parameters
427 | - Handles command execution with timeout
428 |
429 | 3. **AWS CLI Executor**
430 | - Executes AWS CLI commands via subprocess
431 | - Captures standard output and error streams
432 | - Handles command timing and timeout
433 | - Supports piped commands with Unix utilities
434 |
435 | 4. **Prompt Templates**
436 | - Provides pre-defined prompt templates for common AWS tasks
437 | - Helps ensure best practices in AWS operations
438 | - Supports various use cases like security, cost optimization, etc.
439 |
440 | ### New Components for Resources
441 |
442 | 5. **Resources Handler**
443 | - Manages MCP Resources capabilities
444 | - Provides access to AWS-specific resources
445 | - Handles resource requests and responds with resource data
446 |
447 | 6. **AWS Config Reader**
448 | - Reads AWS configuration files (~/.aws/config, ~/.aws/credentials)
449 | - Provides information about available profiles
450 | - Respects AWS credential precedence rules
451 |
452 | 7. **AWS STS Client**
453 | - Obtains AWS account information
454 | - Verifies credential validity
455 | - Provides current identity information
456 |
457 | ## Implementation Details
458 |
459 | ### 1. Server Implementation
460 |
461 | **Current Python Implementation:**
462 |
463 | ```python
464 | from mcp.server.fastmcp import Context, FastMCP
465 | from pydantic import Field
466 |
467 | # Create the FastMCP server
468 | mcp = FastMCP(
469 | "AWS MCP Server",
470 | instructions=INSTRUCTIONS,
471 | version=SERVER_INFO["version"],
472 | )
473 |
474 | # Register tools
475 | @mcp.tool()
476 | async def describe_command(
477 | service: str = Field(description="AWS service (e.g., s3, ec2)"),
478 | command: str | None = Field(description="Command within the service", default=None),
479 | ctx: Context | None = None,
480 | ) -> CommandHelpResult:
481 | """Get AWS CLI command documentation."""
482 | # Implementation...
483 |
484 | @mcp.tool()
485 | async def execute_command(
486 | command: str = Field(description="Complete AWS CLI command to execute"),
487 | timeout: int | None = Field(description="Timeout in seconds", default=None),
488 | ctx: Context | None = None,
489 | ) -> CommandResult:
490 | """Execute an AWS CLI command."""
491 | # Implementation...
492 |
493 | # Register prompts
494 | register_prompts(mcp)
495 | ```
496 |
497 | **Resource Implementation:**
498 |
499 | ```python
500 | # Register all MCP resources
501 | def register_resources(mcp):
502 | """Register all resources with the MCP server instance."""
503 | logger.info("Registering AWS resources")
504 |
505 | @mcp.resource(uri="aws://config/profiles", mime_type="application/json")
506 | async def aws_profiles() -> dict:
507 | """Get available AWS profiles."""
508 | profiles = get_aws_profiles()
509 | current_profile = os.environ.get("AWS_PROFILE", "default")
510 | return {
511 | "profiles": [
512 | {"name": profile, "is_current": profile == current_profile}
513 | for profile in profiles
514 | ]
515 | }
516 |
517 | @mcp.resource(uri="aws://config/regions", mime_type="application/json")
518 | async def aws_regions() -> dict:
519 | """Get available AWS regions."""
520 | regions = get_aws_regions()
521 | current_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
522 | return {
523 | "regions": [
524 | {
525 | "name": region["RegionName"],
526 | "description": region["RegionDescription"],
527 | "is_current": region["RegionName"] == current_region,
528 | }
529 | for region in regions
530 | ]
531 | }
532 |
533 | @mcp.resource(uri="aws://config/regions/{region}", mime_type="application/json")
534 | async def aws_region_details(region: str) -> dict:
535 | """Get detailed information about a specific AWS region."""
536 | return get_region_details(region)
537 |
538 | @mcp.resource(uri="aws://config/environment", mime_type="application/json")
539 | async def aws_environment() -> dict:
540 | """Get AWS environment information."""
541 | return get_aws_environment()
542 |
543 | @mcp.resource(uri="aws://config/account", mime_type="application/json")
544 | async def aws_account() -> dict:
545 | """Get AWS account information."""
546 | return get_aws_account_info()
547 | ```
548 |
549 | ### 2. Directory Structure
550 |
551 | Current structure:
552 |
553 | ```
554 | aws-mcp-server/
555 | ├── src/
556 | │ ├── aws_mcp_server/
557 | │ │ ├── __init__.py
558 | │ │ ├── __main__.py
559 | │ │ ├── cli_executor.py
560 | │ │ ├── config.py
561 | │ │ ├── prompts.py
562 | │ │ ├── server.py
563 | │ │ └── tools.py
564 | ├── tests/
565 | │ ├── unit/
566 | │ │ └── ...
567 | │ └── integration/
568 | │ └── ...
569 | ├── deploy/
570 | │ └── docker/
571 | │ ├── Dockerfile
572 | │ └── docker-compose.yml
573 | ├── docs/
574 | │ └── VERSION.md
575 | ├── pyproject.toml
576 | └── README.md
577 | ```
578 |
579 | Extended structure with resources:
580 |
581 | ```
582 | aws-mcp-server/
583 | ├── src/
584 | │ ├── aws_mcp_server/
585 | │ │ ├── __init__.py
586 | │ │ ├── __main__.py
587 | │ │ ├── cli_executor.py
588 | │ │ ├── config.py
589 | │ │ ├── prompts.py
590 | │ │ ├── resources.py # New file for resource implementations
591 | │ │ ├── server.py
592 | │ │ └── tools.py
593 | ├── tests/
594 | │ ├── unit/
595 | │ │ ├── test_resources.py # New tests for resources
596 | │ │ └── ...
597 | │ └── integration/
598 | │ └── ...
599 | ├── deploy/
600 | │ └── docker/
601 | │ ├── Dockerfile
602 | │ └── docker-compose.yml
603 | ├── docs/
604 | │ └── VERSION.md
605 | ├── pyproject.toml
606 | └── README.md
607 | ```
608 |
609 | ### 3. Error Handling Strategy
610 |
611 | Implement comprehensive error handling for common scenarios:
612 |
613 | - **AWS CLI Not Installed**: Check for AWS CLI presence at startup
614 | - **Authentication Failures**: Return clear error messages with resolution steps
615 | - **Permission Issues**: Clarify required AWS permissions
616 | - **Invalid Commands**: Validate commands before execution
617 | - **Timeout Handling**: Set reasonable command timeouts (default: 300 seconds)
618 | - **Resource Access Failures**: Handle failures to access AWS resources gracefully
619 |
620 | ## Deployment Strategy
621 |
622 | ### 1. Docker Deployment (Primary Method)
623 |
624 | **Dockerfile:**
625 | ```dockerfile
626 | FROM python:3.13-slim
627 |
628 | # Install AWS CLI v2
629 | RUN apt-get update && apt-get install -y \
630 | unzip \
631 | curl \
632 | less \
633 | && curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
634 | && unzip awscliv2.zip \
635 | && ./aws/install \
636 | && rm -rf awscliv2.zip aws \
637 | && apt-get clean \
638 | && rm -rf /var/lib/apt/lists/*
639 |
640 | WORKDIR /app
641 |
642 | # Copy application files
643 | COPY pyproject.toml .
644 | COPY uv.lock .
645 | RUN pip install uv && uv pip sync --system uv.lock
646 |
647 | COPY src/ ./src/
648 |
649 | # Command to run the MCP server
650 | ENTRYPOINT ["python", "-m", "aws_mcp_server"]
651 | ```
652 |
653 | **Docker Compose:**
654 | ```yaml
655 | version: '3'
656 | services:
657 | aws-mcp-server:
658 | build: .
659 | volumes:
660 | - ~/.aws:/root/.aws:ro # Mount AWS credentials as read-only
661 | environment:
662 | - AWS_PROFILE=default # Optional: specify AWS profile
663 | - AWS_REGION=us-east-1 # Optional: specify AWS region
664 | ```
665 |
666 | ### 2. Alternative: Python Virtual Environment
667 |
668 | For users who prefer direct Python installation:
669 |
670 | ```bash
671 | # Clone repository
672 | git clone https://github.com/username/aws-mcp-server.git
673 | cd aws-mcp-server
674 |
675 | # Create and activate virtual environment
676 | python -m venv .venv
677 | source .venv/bin/activate # On Windows: .venv\Scripts\activate
678 |
679 | # Install dependencies with uv
680 | pip install uv
681 | uv pip sync --system uv.lock
682 |
683 | # Run server
684 | python -m aws_mcp_server
685 | ```
686 |
687 | ## Testing Strategy
688 |
689 | ### 1. Unit Tests
690 |
691 | Test individual components in isolation:
692 |
693 | - **CLI Executor Tests**: Mock subprocess calls to verify command construction
694 | - **Resource Provider Tests**: Verify proper extraction of AWS profiles, regions, etc.
695 | - **MCP Resource Tests**: Test resource endpoint implementations
696 |
697 | ### 2. Integration Tests
698 |
699 | Test end-to-end functionality:
700 |
701 | - **MCP Protocol Tests**: Verify proper protocol implementation
702 | - **AWS CLI Integration**: Test with actual AWS CLI using mock credentials
703 | - **Resource Access Tests**: Verify correct resource information retrieval
704 |
705 | ### 3. Test Automation
706 |
707 | Implement CI/CD with GitHub Actions:
708 |
709 | ```yaml
710 | name: Test and Build
711 |
712 | on:
713 | push:
714 | branches: [ main ]
715 | pull_request:
716 | branches: [ main ]
717 |
718 | jobs:
719 | test:
720 | runs-on: ubuntu-latest
721 | steps:
722 | - uses: actions/checkout@v3
723 | - name: Set up Python
724 | uses: actions/setup-python@v4
725 | with:
726 | python-version: '3.13'
727 | - name: Install dependencies
728 | run: |
729 | python -m pip install --upgrade pip
730 | pip install uv
731 | uv pip sync --system uv.lock
732 | - name: Test with pytest
733 | run: |
734 | pytest --cov=src tests/
735 |
736 | build-docker:
737 | needs: test
738 | runs-on: ubuntu-latest
739 | steps:
740 | - uses: actions/checkout@v3
741 | - name: Build Docker image
742 | run: docker build -t aws-mcp-server .
743 | - name: Test Docker image
744 | run: |
745 | docker run --rm aws-mcp-server python -c "import aws_mcp_server; print('OK')"
746 | ```
747 |
748 | ## Security Considerations
749 |
750 | ### Authentication Handling
751 |
752 | - Use AWS credentials on the host machine
753 | - Support profile specification through environment variables
754 | - Never store or log AWS credentials
755 | - Mask sensitive credential information in resource outputs
756 |
757 | ### Command Validation
758 |
759 | - Verify all commands begin with "aws" prefix
760 | - Implement a simple allow/deny pattern for certain services or commands
761 | - Rely on MCP host's approval mechanism for command execution
762 |
763 | ### Resource Limitations
764 |
765 | - Set reasonable timeouts for command execution (default: 300 seconds)
766 | - Limit output size to prevent memory issues (default: 100,000 characters)
767 | - Implement rate limiting for multiple rapid commands
768 |
769 | ## Conclusion
770 |
771 | This updated AWS MCP Server specification provides a clear approach for building a server that integrates with the Model Context Protocol to execute AWS CLI commands and provide AWS resource context through MCP Resources. The implementation leverages the FastMCP library and follows best practices for AWS tool development.
772 |
773 | The updated specification enhances the original by adding MCP Resources support for AWS profiles, regions, environment, and account information. These resources provide valuable context for AI assistants to generate more accurate and relevant AWS CLI commands based on the user's AWS environment.
```
--------------------------------------------------------------------------------
/src/aws_mcp_server/resources.py:
--------------------------------------------------------------------------------
```python
1 | """AWS Resource definitions for the AWS MCP Server.
2 |
3 | This module provides MCP Resources that expose AWS environment information
4 | including available profiles, regions, and current configuration state.
5 | """
6 |
7 | import configparser
8 | import logging
9 | import os
10 | import re
11 | from typing import Any, Dict, List, Optional
12 |
13 | import boto3
14 | from botocore.exceptions import BotoCoreError, ClientError
15 |
16 | logger = logging.getLogger(__name__)
17 |
18 |
19 | def get_aws_profiles() -> List[str]:
20 | """Get available AWS profiles from config and credentials files.
21 |
22 | Reads the AWS config and credentials files to extract all available profiles.
23 |
24 | Returns:
25 | List of profile names
26 | """
27 | profiles = ["default"] # default profile always exists
28 | config_paths = [
29 | os.path.expanduser("~/.aws/config"),
30 | os.path.expanduser("~/.aws/credentials"),
31 | ]
32 |
33 | try:
34 | for config_path in config_paths:
35 | if not os.path.exists(config_path):
36 | continue
37 |
38 | config = configparser.ConfigParser()
39 | config.read(config_path)
40 |
41 | for section in config.sections():
42 | # In config file, profiles are named [profile xyz] except default
43 | # In credentials file, profiles are named [xyz]
44 | profile_match = re.match(r"profile\s+(.+)", section)
45 | if profile_match:
46 | # This is from config file
47 | profile_name = profile_match.group(1)
48 | if profile_name not in profiles:
49 | profiles.append(profile_name)
50 | elif section != "default" and section not in profiles:
51 | # This is likely from credentials file
52 | profiles.append(section)
53 | except Exception as e:
54 | logger.warning(f"Error reading AWS profiles: {e}")
55 |
56 | return profiles
57 |
58 |
59 | def get_aws_regions() -> List[Dict[str, str]]:
60 | """Get available AWS regions.
61 |
62 | Uses boto3 to retrieve the list of available AWS regions.
63 | Automatically uses credentials from environment variables if no config file is available.
64 |
65 | Returns:
66 | List of region dictionaries with name and description
67 | """
68 | try:
69 | # Create a session - boto3 will automatically use credentials from
70 | # environment variables if no config file is available
71 | session = boto3.session.Session(region_name=os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")))
72 | ec2 = session.client("ec2")
73 | response = ec2.describe_regions()
74 |
75 | # Format the regions
76 | regions = []
77 | for region in response["Regions"]:
78 | region_name = region["RegionName"]
79 | # Create a friendly name based on the region code
80 | description = _get_region_description(region_name)
81 | regions.append({"RegionName": region_name, "RegionDescription": description})
82 |
83 | # Sort regions by name
84 | regions.sort(key=lambda r: r["RegionName"])
85 | return regions
86 | except (BotoCoreError, ClientError) as e:
87 | logger.warning(f"Error fetching AWS regions: {e}")
88 | # Fallback to a static list of common regions
89 | return [
90 | {"RegionName": "us-east-1", "RegionDescription": "US East (N. Virginia)"},
91 | {"RegionName": "us-east-2", "RegionDescription": "US East (Ohio)"},
92 | {"RegionName": "us-west-1", "RegionDescription": "US West (N. California)"},
93 | {"RegionName": "us-west-2", "RegionDescription": "US West (Oregon)"},
94 | {"RegionName": "eu-west-1", "RegionDescription": "EU West (Ireland)"},
95 | {"RegionName": "eu-west-2", "RegionDescription": "EU West (London)"},
96 | {"RegionName": "eu-central-1", "RegionDescription": "EU Central (Frankfurt)"},
97 | {"RegionName": "ap-northeast-1", "RegionDescription": "Asia Pacific (Tokyo)"},
98 | {"RegionName": "ap-northeast-2", "RegionDescription": "Asia Pacific (Seoul)"},
99 | {"RegionName": "ap-southeast-1", "RegionDescription": "Asia Pacific (Singapore)"},
100 | {"RegionName": "ap-southeast-2", "RegionDescription": "Asia Pacific (Sydney)"},
101 | {"RegionName": "sa-east-1", "RegionDescription": "South America (São Paulo)"},
102 | ]
103 | except Exception as e:
104 | logger.warning(f"Unexpected error fetching AWS regions: {e}")
105 | return []
106 |
107 |
108 | def _get_region_description(region_code: str) -> str:
109 | """Convert region code to a human-readable description.
110 |
111 | Args:
112 | region_code: AWS region code (e.g., us-east-1)
113 |
114 | Returns:
115 | Human-readable region description
116 | """
117 | region_map = {
118 | "us-east-1": "US East (N. Virginia)",
119 | "us-east-2": "US East (Ohio)",
120 | "us-west-1": "US West (N. California)",
121 | "us-west-2": "US West (Oregon)",
122 | "af-south-1": "Africa (Cape Town)",
123 | "ap-east-1": "Asia Pacific (Hong Kong)",
124 | "ap-south-1": "Asia Pacific (Mumbai)",
125 | "ap-northeast-1": "Asia Pacific (Tokyo)",
126 | "ap-northeast-2": "Asia Pacific (Seoul)",
127 | "ap-northeast-3": "Asia Pacific (Osaka)",
128 | "ap-southeast-1": "Asia Pacific (Singapore)",
129 | "ap-southeast-2": "Asia Pacific (Sydney)",
130 | "ap-southeast-3": "Asia Pacific (Jakarta)",
131 | "ca-central-1": "Canada (Central)",
132 | "eu-central-1": "EU Central (Frankfurt)",
133 | "eu-west-1": "EU West (Ireland)",
134 | "eu-west-2": "EU West (London)",
135 | "eu-west-3": "EU West (Paris)",
136 | "eu-north-1": "EU North (Stockholm)",
137 | "eu-south-1": "EU South (Milan)",
138 | "me-south-1": "Middle East (Bahrain)",
139 | "sa-east-1": "South America (São Paulo)",
140 | }
141 |
142 | return region_map.get(region_code, f"AWS Region {region_code}")
143 |
144 |
145 | def get_region_available_services(session: boto3.session.Session, region_code: str) -> List[Dict[str, str]]:
146 | """Get available AWS services for a specific region.
147 |
148 | Uses the Service Quotas API to get a comprehensive list of services available
149 | in the given region. Falls back to testing client creation for common services
150 | if the Service Quotas API fails.
151 |
152 | Args:
153 | session: Boto3 session to use for API calls
154 | region_code: AWS region code (e.g., us-east-1)
155 |
156 | Returns:
157 | List of dictionaries with service ID and name
158 | """
159 | available_services = []
160 | try:
161 | # Create a Service Quotas client
162 | quotas_client = session.client("service-quotas", region_name=region_code)
163 |
164 | # List all services available in the region
165 | next_token = None
166 | while True:
167 | if next_token:
168 | response = quotas_client.list_services(NextToken=next_token)
169 | else:
170 | response = quotas_client.list_services()
171 |
172 | # Extract service codes
173 | for service in response.get("Services", []):
174 | service_code = service.get("ServiceCode")
175 | if service_code:
176 | # Convert ServiceQuota service codes to boto3 service names
177 | # by removing the "AWS." prefix if present
178 | boto3_service_id = service_code
179 | if service_code.startswith("AWS."):
180 | boto3_service_id = service_code[4:].lower()
181 | # Some other service codes need additional transformation
182 | elif "." in service_code:
183 | boto3_service_id = service_code.split(".")[-1].lower()
184 | else:
185 | boto3_service_id = service_code.lower()
186 |
187 | available_services.append({"id": boto3_service_id, "name": service.get("ServiceName", service_code)})
188 |
189 | # Check if there are more services to fetch
190 | next_token = response.get("NextToken")
191 | if not next_token:
192 | break
193 |
194 | except Exception as e:
195 | logger.debug(f"Error fetching services with Service Quotas API for {region_code}: {e}")
196 | # Fall back to the client creation method for a subset of common services
197 | common_services = [
198 | "ec2",
199 | "s3",
200 | "lambda",
201 | "rds",
202 | "dynamodb",
203 | "cloudformation",
204 | "sqs",
205 | "sns",
206 | "iam",
207 | "cloudwatch",
208 | "kinesis",
209 | "apigateway",
210 | "ecs",
211 | "ecr",
212 | "eks",
213 | "route53",
214 | "secretsmanager",
215 | "ssm",
216 | "kms",
217 | "elasticbeanstalk",
218 | "elasticache",
219 | "elasticsearch",
220 | ]
221 |
222 | for service_name in common_services:
223 | try:
224 | # Try to create a client for the service in the region
225 | # If it succeeds, the service is available
226 | session.client(service_name, region_name=region_code)
227 | available_services.append(
228 | {"id": service_name, "name": service_name.upper() if service_name in ["ec2", "s3"] else service_name.replace("-", " ").title()}
229 | )
230 | except Exception:
231 | # If client creation fails, the service might not be available in this region
232 | pass
233 |
234 | return available_services
235 |
236 |
237 | def _get_region_geographic_location(region_code: str) -> Dict[str, str]:
238 | """Get geographic location information for a region.
239 |
240 | Args:
241 | region_code: AWS region code (e.g., us-east-1)
242 |
243 | Returns:
244 | Dictionary with geographic information
245 | """
246 | # Map of region codes to geographic information
247 | geo_map = {
248 | "us-east-1": {"continent": "North America", "country": "United States", "city": "Ashburn, Virginia"},
249 | "us-east-2": {"continent": "North America", "country": "United States", "city": "Columbus, Ohio"},
250 | "us-west-1": {"continent": "North America", "country": "United States", "city": "San Francisco, California"},
251 | "us-west-2": {"continent": "North America", "country": "United States", "city": "Portland, Oregon"},
252 | "af-south-1": {"continent": "Africa", "country": "South Africa", "city": "Cape Town"},
253 | "ap-east-1": {"continent": "Asia", "country": "China", "city": "Hong Kong"},
254 | "ap-south-1": {"continent": "Asia", "country": "India", "city": "Mumbai"},
255 | "ap-northeast-1": {"continent": "Asia", "country": "Japan", "city": "Tokyo"},
256 | "ap-northeast-2": {"continent": "Asia", "country": "South Korea", "city": "Seoul"},
257 | "ap-northeast-3": {"continent": "Asia", "country": "Japan", "city": "Osaka"},
258 | "ap-southeast-1": {"continent": "Asia", "country": "Singapore", "city": "Singapore"},
259 | "ap-southeast-2": {"continent": "Oceania", "country": "Australia", "city": "Sydney"},
260 | "ap-southeast-3": {"continent": "Asia", "country": "Indonesia", "city": "Jakarta"},
261 | "ca-central-1": {"continent": "North America", "country": "Canada", "city": "Montreal"},
262 | "eu-central-1": {"continent": "Europe", "country": "Germany", "city": "Frankfurt"},
263 | "eu-west-1": {"continent": "Europe", "country": "Ireland", "city": "Dublin"},
264 | "eu-west-2": {"continent": "Europe", "country": "United Kingdom", "city": "London"},
265 | "eu-west-3": {"continent": "Europe", "country": "France", "city": "Paris"},
266 | "eu-north-1": {"continent": "Europe", "country": "Sweden", "city": "Stockholm"},
267 | "eu-south-1": {"continent": "Europe", "country": "Italy", "city": "Milan"},
268 | "me-south-1": {"continent": "Middle East", "country": "Bahrain", "city": "Manama"},
269 | "sa-east-1": {"continent": "South America", "country": "Brazil", "city": "São Paulo"},
270 | }
271 |
272 | # Return default information if region not found
273 | default_geo = {"continent": "Unknown", "country": "Unknown", "city": "Unknown"}
274 | return geo_map.get(region_code, default_geo)
275 |
276 |
277 | def get_region_details(region_code: str) -> Dict[str, Any]:
278 | """Get detailed information about a specific AWS region.
279 |
280 | Args:
281 | region_code: AWS region code (e.g., us-east-1)
282 |
283 | Returns:
284 | Dictionary with region details
285 | """
286 | region_info = {
287 | "code": region_code,
288 | "name": _get_region_description(region_code),
289 | "geographic_location": _get_region_geographic_location(region_code),
290 | "availability_zones": [],
291 | "services": [],
292 | "is_current": region_code == os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")),
293 | }
294 |
295 | try:
296 | # Create a session with the specified region
297 | session = boto3.session.Session(region_name=region_code)
298 |
299 | # Get availability zones
300 | try:
301 | ec2 = session.client("ec2", region_name=region_code)
302 | response = ec2.describe_availability_zones(Filters=[{"Name": "region-name", "Values": [region_code]}])
303 |
304 | azs = []
305 | for az in response.get("AvailabilityZones", []):
306 | azs.append(
307 | {
308 | "name": az.get("ZoneName", ""),
309 | "state": az.get("State", ""),
310 | "zone_id": az.get("ZoneId", ""),
311 | "zone_type": az.get("ZoneType", ""),
312 | }
313 | )
314 |
315 | region_info["availability_zones"] = azs
316 | except Exception as e:
317 | logger.debug(f"Error fetching availability zones for {region_code}: {e}")
318 |
319 | # Get available services for the region
320 | region_info["services"] = get_region_available_services(session, region_code)
321 |
322 | except Exception as e:
323 | logger.warning(f"Error fetching region details for {region_code}: {e}")
324 |
325 | return region_info
326 |
327 |
328 | def get_aws_environment() -> Dict[str, str]:
329 | """Get information about the current AWS environment.
330 |
331 | Collects information about the active AWS environment,
332 | including profile, region, and credential status.
333 | Works with both config files and environment variables for credentials.
334 |
335 | Returns:
336 | Dictionary with AWS environment information
337 | """
338 | env_info = {
339 | "aws_profile": os.environ.get("AWS_PROFILE", "default"),
340 | "aws_region": os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")),
341 | "has_credentials": False,
342 | "credentials_source": "none",
343 | }
344 |
345 | try:
346 | # Try to load credentials from the session (preferred method)
347 | session = boto3.session.Session()
348 | credentials = session.get_credentials()
349 | if credentials:
350 | env_info["has_credentials"] = True
351 | source = "profile"
352 |
353 | # Determine credential source if possible
354 | if credentials.method == "shared-credentials-file":
355 | source = "profile"
356 | elif credentials.method == "environment":
357 | source = "environment"
358 | elif credentials.method == "iam-role":
359 | source = "instance-profile"
360 | elif credentials.method == "assume-role":
361 | source = "assume-role"
362 | elif credentials.method == "container-role":
363 | source = "container-role"
364 |
365 | env_info["credentials_source"] = source
366 | except Exception as e:
367 | logger.warning(f"Error checking credentials: {e}")
368 |
369 | return env_info
370 |
371 |
372 | def _mask_key(key: str) -> str:
373 | """Mask a sensitive key for security.
374 |
375 | Args:
376 | key: The key to mask
377 |
378 | Returns:
379 | Masked key with only the first few characters visible
380 | """
381 | if not key:
382 | return ""
383 |
384 | # Show only first few characters
385 | visible_len = min(3, len(key))
386 | return key[:visible_len] + "*" * (len(key) - visible_len)
387 |
388 |
389 | def get_aws_account_info() -> Dict[str, Optional[str]]:
390 | """Get information about the current AWS account.
391 |
392 | Uses STS to retrieve account ID and alias information.
393 | Automatically uses credentials from environment variables if no config file is available.
394 |
395 | Returns:
396 | Dictionary with AWS account information
397 | """
398 | account_info = {
399 | "account_id": None,
400 | "account_alias": None,
401 | "organization_id": None,
402 | }
403 |
404 | try:
405 | # Create a session - boto3 will automatically use credentials from
406 | # environment variables if no config file is available
407 | session = boto3.session.Session(region_name=os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")))
408 |
409 | # Get account ID from STS
410 | sts = session.client("sts")
411 | account_id = sts.get_caller_identity().get("Account")
412 | account_info["account_id"] = account_id
413 |
414 | # Try to get account alias
415 | if account_id:
416 | try:
417 | iam = session.client("iam")
418 | aliases = iam.list_account_aliases().get("AccountAliases", [])
419 | if aliases:
420 | account_info["account_alias"] = aliases[0]
421 | except Exception as e:
422 | logger.debug(f"Error getting account alias: {e}")
423 |
424 | # Try to get organization info
425 | try:
426 | org = session.client("organizations")
427 | # First try to get organization info
428 | try:
429 | org_response = org.describe_organization()
430 | if "OrganizationId" in org_response:
431 | account_info["organization_id"] = org_response["OrganizationId"]
432 | except Exception:
433 | # Then try to get account-specific info if org-level call fails
434 | account_response = org.describe_account(AccountId=account_id)
435 | if "Account" in account_response and "Id" in account_response["Account"]:
436 | # The account ID itself isn't the organization ID, but we might
437 | # be able to extract information from other means
438 | account_info["account_id"] = account_response["Account"]["Id"]
439 | except Exception as e:
440 | # Organizations access is often restricted, so this is expected to fail in many cases
441 | logger.debug(f"Error getting organization info: {e}")
442 | except Exception as e:
443 | logger.warning(f"Error getting AWS account info: {e}")
444 |
445 | return account_info
446 |
447 |
448 | def register_resources(mcp):
449 | """Register all resources with the MCP server instance.
450 |
451 | Args:
452 | mcp: The FastMCP server instance
453 | """
454 | logger.info("Registering AWS resources")
455 |
456 | @mcp.resource(name="aws_profiles", description="Get available AWS profiles", uri="aws://config/profiles", mime_type="application/json")
457 | async def aws_profiles() -> dict:
458 | """Get available AWS profiles.
459 |
460 | Retrieves a list of available AWS profile names from the
461 | AWS configuration and credentials files.
462 |
463 | Returns:
464 | Dictionary with profile information
465 | """
466 | profiles = get_aws_profiles()
467 | current_profile = os.environ.get("AWS_PROFILE", "default")
468 | return {"profiles": [{"name": profile, "is_current": profile == current_profile} for profile in profiles]}
469 |
470 | @mcp.resource(name="aws_regions", description="Get available AWS regions", uri="aws://config/regions", mime_type="application/json")
471 | async def aws_regions() -> dict:
472 | """Get available AWS regions.
473 |
474 | Retrieves a list of available AWS regions with
475 | their descriptive names.
476 |
477 | Returns:
478 | Dictionary with region information
479 | """
480 | regions = get_aws_regions()
481 | current_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
482 | return {
483 | "regions": [
484 | {
485 | "name": region["RegionName"],
486 | "description": region["RegionDescription"],
487 | "is_current": region["RegionName"] == current_region,
488 | }
489 | for region in regions
490 | ]
491 | }
492 |
493 | @mcp.resource(
494 | name="aws_region_details",
495 | description="Get detailed information about a specific AWS region",
496 | uri="aws://config/regions/{region}",
497 | mime_type="application/json",
498 | )
499 | async def aws_region_details(region: str) -> dict:
500 | """Get detailed information about a specific AWS region.
501 |
502 | Retrieves detailed information about a specific AWS region,
503 | including its name, code, availability zones, geographic location,
504 | and available services.
505 |
506 | Args:
507 | region: AWS region code (e.g., us-east-1)
508 |
509 | Returns:
510 | Dictionary with detailed region information
511 | """
512 | logger.info(f"Getting detailed information for region: {region}")
513 | return get_region_details(region)
514 |
515 | @mcp.resource(name="aws_environment", description="Get AWS environment information", uri="aws://config/environment", mime_type="application/json")
516 | async def aws_environment() -> dict:
517 | """Get AWS environment information.
518 |
519 | Retrieves information about the current AWS environment,
520 | including profile, region, and credential status.
521 |
522 | Returns:
523 | Dictionary with environment information
524 | """
525 | return get_aws_environment()
526 |
527 | @mcp.resource(name="aws_account", description="Get AWS account information", uri="aws://config/account", mime_type="application/json")
528 | async def aws_account() -> dict:
529 | """Get AWS account information.
530 |
531 | Retrieves information about the current AWS account,
532 | including account ID and alias.
533 |
534 | Returns:
535 | Dictionary with account information
536 | """
537 | return get_aws_account_info()
538 |
539 | logger.info("Successfully registered all AWS resources")
540 |
```
--------------------------------------------------------------------------------
/src/aws_mcp_server/security.py:
--------------------------------------------------------------------------------
```python
1 | """Security utilities for AWS MCP Server.
2 |
3 | This module provides security validation for AWS CLI commands,
4 | including validation of command structure, dangerous command detection,
5 | and pipe command validation.
6 | """
7 |
8 | import logging
9 | import re
10 | import shlex
11 | from dataclasses import dataclass, field
12 | from pathlib import Path
13 | from typing import Dict, List, Optional
14 |
15 | import yaml
16 |
17 | from aws_mcp_server.config import SECURITY_CONFIG_PATH, SECURITY_MODE
18 | from aws_mcp_server.tools import (
19 | is_pipe_command,
20 | split_pipe_command,
21 | validate_unix_command,
22 | )
23 |
24 | logger = logging.getLogger(__name__)
25 |
26 | # Default dictionary of potentially dangerous commands by security category
27 | # Focus on commands that could lead to security incidents, privilege escalation,
28 | # credential theft, or account takeover
29 | DEFAULT_DANGEROUS_COMMANDS: Dict[str, List[str]] = {
30 | # Identity and Access Management - core of security
31 | "iam": [
32 | "aws iam create-user", # Creating new users (potential backdoor accounts)
33 | "aws iam create-access-key", # Creating credentials (could lead to credential theft)
34 | "aws iam attach-user-policy", # Attaching policies to users (privilege escalation)
35 | "aws iam attach-role-policy", # Attaching policies to roles (privilege escalation)
36 | "aws iam attach-group-policy", # Attaching policies to groups (privilege escalation)
37 | "aws iam create-policy", # Creating new policies (potentially overprivileged)
38 | "aws iam put-user-policy", # Inline policies for users (privilege escalation)
39 | "aws iam put-role-policy", # Inline policies for roles (privilege escalation)
40 | "aws iam put-group-policy", # Inline policies for groups (privilege escalation)
41 | "aws iam create-login-profile", # Creating console passwords (potential backdoor)
42 | "aws iam update-access-key", # Updating access key status (credential management)
43 | "aws iam update-assume-role-policy", # Changing who can assume a role
44 | "aws iam remove-role-from-instance-profile", # Removing roles (privilege escalation)
45 | "aws iam update-role", # Modifying role (privilege escalation)
46 | "aws iam create-virtual-mfa-device", # Creating MFA devices
47 | "aws iam deactivate-mfa-device", # Disabling MFA (security circumvention)
48 | "aws iam delete-", # Any IAM delete operations (potential denial of service)
49 | ],
50 | # Security, Identity & Compliance services
51 | "organizations": [
52 | "aws organizations create-account", # Creating accounts
53 | "aws organizations leave-organization", # Leaving an organization
54 | "aws organizations remove-account-from-organization", # Removing accounts
55 | "aws organizations disable-policy-type", # Disabling policy enforcement
56 | "aws organizations create-policy", # Creating organization policies
57 | "aws organizations attach-policy", # Attaching organization policies
58 | ],
59 | "sts": [
60 | "aws sts assume-role", # Assuming roles with potentially higher privileges
61 | "aws sts get-session-token", # Getting session tokens
62 | "aws sts get-federation-token", # Getting federated tokens
63 | ],
64 | "secretsmanager": [
65 | "aws secretsmanager put-secret-value", # Changing secrets
66 | "aws secretsmanager update-secret", # Updating secrets
67 | "aws secretsmanager delete-secret", # Deleting secrets
68 | "aws secretsmanager restore-secret", # Restoring deleted secrets
69 | ],
70 | "kms": [
71 | "aws kms schedule-key-deletion", # Scheduling key deletion (potential data loss)
72 | "aws kms disable-key", # Disabling keys (potential data loss)
73 | "aws kms create-grant", # Creating grants (key access)
74 | "aws kms revoke-grant", # Revoking grants (potential denial of service)
75 | ],
76 | # Audit & Logging services - tampering with these is critical
77 | "cloudtrail": [
78 | "aws cloudtrail delete-trail", # Deleting audit trails
79 | "aws cloudtrail stop-logging", # Stopping audit logging
80 | "aws cloudtrail update-trail", # Modifying audit configurations
81 | "aws cloudtrail put-event-selectors", # Changing what events are logged
82 | ],
83 | "cloudwatch": [
84 | "aws cloudwatch delete-alarms", # Deleting security alarms
85 | "aws cloudwatch disable-alarm-actions", # Disabling alarm actions
86 | "aws cloudwatch delete-dashboards", # Deleting monitoring dashboards
87 | ],
88 | "config": [
89 | "aws configservice delete-configuration-recorder", # Deleting config recording
90 | "aws configservice stop-configuration-recorder", # Stopping config recording
91 | "aws configservice delete-delivery-channel", # Deleting config delivery
92 | "aws configservice delete-remediation-configuration", # Deleting auto-remediation
93 | ],
94 | "guardduty": [
95 | "aws guardduty delete-detector", # Deleting threat detection
96 | "aws guardduty disable-organization-admin-account", # Disabling central security
97 | "aws guardduty update-detector", # Modifying threat detection
98 | ],
99 | # Network & Data security
100 | "ec2": [
101 | "aws ec2 authorize-security-group-ingress", # Opening inbound network access
102 | "aws ec2 authorize-security-group-egress", # Opening outbound network access
103 | "aws ec2 modify-instance-attribute", # Changing security attributes
104 | ],
105 | "s3": [
106 | "aws s3api put-bucket-policy", # Changing bucket permissions
107 | "aws s3api put-bucket-acl", # Changing bucket ACLs
108 | "aws s3api delete-bucket-policy", # Removing bucket policy protections
109 | "aws s3api delete-bucket-encryption", # Removing encryption
110 | "aws s3api put-public-access-block", # Changing public access settings
111 | ],
112 | }
113 |
114 | # Default dictionary of safe patterns that override dangerous commands
115 | # These patterns explicitly allow read-only operations that are needed for normal use
116 | DEFAULT_SAFE_PATTERNS: Dict[str, List[str]] = {
117 | # Universal safe patterns for any AWS service
118 | "general": [
119 | "--help", # All help commands are safe
120 | "help", # All help subcommands are safe
121 | "--version", # Version information is safe
122 | "--dry-run", # Dry run operations don't make changes
123 | ],
124 | # Identity and Access Management
125 | "iam": [
126 | "aws iam get-", # Read-only IAM operations
127 | "aws iam list-", # Listing IAM resources
128 | "aws iam generate-credential-report", # Generate reports (no security impact)
129 | "aws iam generate-service-last-accessed-details", # Generate access reports
130 | "aws iam simulate-custom-policy", # Policy simulation (no changes)
131 | "aws iam simulate-principal-policy", # Policy simulation (no changes)
132 | ],
133 | # Security, Identity & Compliance services
134 | "organizations": [
135 | "aws organizations describe-", # Read-only Organizations operations
136 | "aws organizations list-", # Listing Organization resources
137 | ],
138 | "sts": [
139 | "aws sts get-caller-identity", # Checking current identity (safe)
140 | "aws sts decode-authorization-message", # Decoding error messages (safe)
141 | ],
142 | "secretsmanager": [
143 | "aws secretsmanager get-", # Reading secrets (note: still sensitive)
144 | "aws secretsmanager list-", # Listing secrets
145 | "aws secretsmanager describe-", # Reading metadata about secrets
146 | ],
147 | "kms": [
148 | "aws kms describe-", # Reading key metadata
149 | "aws kms get-", # Getting key information
150 | "aws kms list-", # Listing keys
151 | ],
152 | # Audit & Logging services
153 | "cloudtrail": [
154 | "aws cloudtrail describe-", # Reading trail info
155 | "aws cloudtrail get-", # Getting trail settings
156 | "aws cloudtrail list-", # Listing trails
157 | "aws cloudtrail lookup-events", # Searching events (read-only)
158 | ],
159 | "cloudwatch": [
160 | "aws cloudwatch describe-", # Reading alarm info
161 | "aws cloudwatch get-", # Getting metric data
162 | "aws cloudwatch list-", # Listing metrics and alarms
163 | ],
164 | "config": [
165 | "aws configservice describe-", # Reading configuration info
166 | "aws configservice get-", # Getting config data
167 | "aws configservice list-", # Listing config resources
168 | "aws configservice select-resource-config", # Querying config (read-only)
169 | ],
170 | "guardduty": [
171 | "aws guardduty describe-", # Reading detector info
172 | "aws guardduty get-", # Getting findings and settings
173 | "aws guardduty list-", # Listing GuardDuty resources
174 | ],
175 | # Network & Data security
176 | "ec2": [
177 | "aws ec2 describe-", # All EC2 describe operations
178 | "aws ec2 get-", # All EC2 get operations
179 | # Network security specific commands
180 | "aws ec2 describe-security-groups", # Reading security group configurations
181 | "aws ec2 describe-network-acls", # Reading network ACL configurations
182 | ],
183 | "s3": [
184 | "aws s3 ls", # Listing buckets or objects (read-only)
185 | "aws s3api get-", # All S3 API get operations (read-only)
186 | "aws s3api list-", # All S3 API list operations (read-only)
187 | "aws s3api head-", # All S3 API head operations (read-only)
188 | # Security-specific S3 operations
189 | "aws s3api get-bucket-policy", # Reading bucket policies
190 | "aws s3api get-bucket-encryption", # Reading encryption settings
191 | "aws s3api get-public-access-block", # Reading public access settings
192 | ],
193 | }
194 |
195 | # Default regex patterns for more complex rules that cannot be easily captured
196 | # with simple command prefix matching
197 | DEFAULT_REGEX_RULES: Dict[str, List[Dict[str, str]]] = {
198 | # Security patterns that apply to all AWS services
199 | "general": [
200 | # Identity and Authentication Risks
201 | {
202 | "pattern": r"aws .* --profile\s+(root|admin|administrator)",
203 | "description": "Prevent use of sensitive profiles",
204 | "error_message": "Using sensitive profiles (root, admin) is restricted for security reasons.",
205 | },
206 | # Protocol and Encryption Risks
207 | {
208 | "pattern": r"aws .* --no-verify-ssl",
209 | "description": "Prevent disabling SSL verification",
210 | "error_message": "Disabling SSL verification is not allowed for security reasons.",
211 | },
212 | {
213 | "pattern": r"aws .* --output\s+text\s+.*--query\s+.*Password",
214 | "description": "Prevent password exposure in text output",
215 | "error_message": "Outputting sensitive data like passwords in text format is restricted.",
216 | },
217 | # Parameter security
218 | {
219 | "pattern": r"aws .* --debug",
220 | "description": "Prevent debug mode which shows sensitive info",
221 | "error_message": "Debug mode is restricted as it may expose sensitive information.",
222 | },
223 | ],
224 | # IAM-specific security patterns
225 | "iam": [
226 | # Privileged user creation
227 | {
228 | "pattern": r"aws iam create-user.*--user-name\s+(root|admin|administrator|backup|security|finance|billing)",
229 | "description": "Prevent creation of privileged-sounding users",
230 | "error_message": "Creating users with sensitive names is restricted for security reasons.",
231 | },
232 | # Privilege escalation via policies
233 | {
234 | "pattern": r"aws iam attach-user-policy.*--policy-arn\s+.*Administrator",
235 | "description": "Prevent attaching Administrator policies",
236 | "error_message": "Attaching Administrator policies is restricted for security reasons.",
237 | },
238 | {
239 | "pattern": r"aws iam attach-user-policy.*--policy-arn\s+.*FullAccess",
240 | "description": "Prevent attaching FullAccess policies to users",
241 | "error_message": "Attaching FullAccess policies directly to users is restricted (use roles instead).",
242 | },
243 | {
244 | "pattern": r"aws iam create-policy.*\"Effect\":\s*\"Allow\".*\"Action\":\s*\"\*\".*\"Resource\":\s*\"\*\"",
245 | "description": "Prevent creation of policies with * permissions",
246 | "error_message": "Creating policies with unrestricted (*) permissions is not allowed.",
247 | },
248 | # Password and access key controls
249 | {
250 | "pattern": r"aws iam create-login-profile.*--password-reset-required\s+false",
251 | "description": "Enforce password reset for new profiles",
252 | "error_message": "Creating login profiles without requiring password reset is restricted.",
253 | },
254 | {
255 | "pattern": r"aws iam update-account-password-policy.*--require-uppercase-characters\s+false",
256 | "description": "Prevent weakening password policies",
257 | "error_message": "Weakening account password policies is restricted.",
258 | },
259 | ],
260 | # Data security patterns
261 | "s3": [
262 | # Public access risks
263 | {
264 | "pattern": r"aws s3api put-bucket-policy.*\"Effect\":\s*\"Allow\".*\"Principal\":\s*\"\*\"",
265 | "description": "Prevent public bucket policies",
266 | "error_message": "Creating public bucket policies is restricted for security reasons.",
267 | },
268 | {
269 | "pattern": r"aws s3api put-public-access-block.*--public-access-block-configuration\s+.*\"BlockPublicAcls\":\s*false",
270 | "description": "Prevent disabling public access blocks",
271 | "error_message": "Disabling S3 public access blocks is restricted for security reasons.",
272 | },
273 | # Encryption risks
274 | {
275 | "pattern": r"aws s3api create-bucket.*--region\s+(?!eu|us-east-1).*--acl\s+public",
276 | "description": "Prevent public buckets outside of allowed regions",
277 | "error_message": "Creating public buckets outside allowed regions is restricted.",
278 | },
279 | ],
280 | # Network security patterns
281 | "ec2": [
282 | # Network exposure risks
283 | {
284 | "pattern": r"aws ec2 authorize-security-group-ingress.*--cidr\s+0\.0\.0\.0/0.*--port\s+(?!80|443)[0-9]+",
285 | "description": "Prevent open security groups for non-web ports",
286 | "error_message": "Opening non-web ports (other than 80/443) to the entire internet (0.0.0.0/0) is restricted.",
287 | },
288 | {
289 | "pattern": r"aws ec2 run-instances.*--user-data\s+.*curl.*\|.*sh",
290 | "description": "Detect potentially unsafe user-data scripts",
291 | "error_message": "Running scripts from remote sources in user-data presents security risks.",
292 | },
293 | ],
294 | # Logging and monitoring integrity
295 | "cloudtrail": [
296 | {
297 | "pattern": r"aws cloudtrail update-trail.*--no-include-global-service-events",
298 | "description": "Prevent disabling global event logging",
299 | "error_message": "Disabling CloudTrail logging for global service events is restricted.",
300 | },
301 | {
302 | "pattern": r"aws cloudtrail update-trail.*--no-multi-region",
303 | "description": "Prevent making trails single-region",
304 | "error_message": "Changing CloudTrail trails from multi-region to single-region is restricted.",
305 | },
306 | ],
307 | }
308 |
309 |
310 | @dataclass
311 | class ValidationRule:
312 | """Represents a command validation rule."""
313 |
314 | pattern: str
315 | description: str
316 | error_message: str
317 | regex: bool = False
318 |
319 |
320 | @dataclass
321 | class SecurityConfig:
322 | """Security configuration for command validation."""
323 |
324 | dangerous_commands: Dict[str, List[str]]
325 | safe_patterns: Dict[str, List[str]]
326 | regex_rules: Dict[str, List[ValidationRule]] = field(default_factory=dict)
327 |
328 | def __post_init__(self):
329 | """Initialize default values."""
330 | if not self.regex_rules:
331 | self.regex_rules = {}
332 |
333 |
334 | def load_security_config() -> SecurityConfig:
335 | """Load security configuration from YAML file or use defaults.
336 |
337 | Returns:
338 | SecurityConfig object with loaded configuration
339 | """
340 | dangerous_commands = DEFAULT_DANGEROUS_COMMANDS.copy()
341 | safe_patterns = DEFAULT_SAFE_PATTERNS.copy()
342 | regex_rules = {}
343 |
344 | # Load default regex rules
345 | for category, rules in DEFAULT_REGEX_RULES.items():
346 | regex_rules[category] = []
347 | for rule in rules:
348 | regex_rules[category].append(
349 | ValidationRule(
350 | pattern=rule["pattern"],
351 | description=rule["description"],
352 | error_message=rule["error_message"],
353 | regex=True,
354 | )
355 | )
356 |
357 | # Load custom configuration if provided
358 | if SECURITY_CONFIG_PATH:
359 | config_path = Path(SECURITY_CONFIG_PATH)
360 | if config_path.exists():
361 | try:
362 | with open(config_path) as f:
363 | config_data = yaml.safe_load(f)
364 |
365 | # Update dangerous commands
366 | if "dangerous_commands" in config_data:
367 | for service, commands in config_data["dangerous_commands"].items():
368 | dangerous_commands[service] = commands
369 |
370 | # Update safe patterns
371 | if "safe_patterns" in config_data:
372 | for service, patterns in config_data["safe_patterns"].items():
373 | safe_patterns[service] = patterns
374 |
375 | # Load custom regex rules
376 | if "regex_rules" in config_data:
377 | for category, rules in config_data["regex_rules"].items():
378 | if category not in regex_rules:
379 | regex_rules[category] = []
380 |
381 | for rule in rules:
382 | regex_rules[category].append(
383 | ValidationRule(
384 | pattern=rule["pattern"],
385 | description=rule["description"],
386 | error_message=rule.get("error_message", f"Command matches restricted pattern: {rule['pattern']}"),
387 | regex=True,
388 | )
389 | )
390 |
391 | logger.info(f"Loaded security configuration from {config_path}")
392 | except Exception as e:
393 | logger.error(f"Error loading security configuration: {str(e)}")
394 | logger.warning("Using default security configuration")
395 |
396 | return SecurityConfig(dangerous_commands=dangerous_commands, safe_patterns=safe_patterns, regex_rules=regex_rules)
397 |
398 |
399 | # Initialize security configuration
400 | SECURITY_CONFIG = load_security_config()
401 |
402 |
403 | def is_service_command_safe(command: str, service: str) -> bool:
404 | """Check if a command for a specific service is safe.
405 |
406 | This checks if a command that might match a dangerous pattern
407 | also matches a safe pattern, which would override the dangerous check.
408 |
409 | The function checks both:
410 | 1. Service-specific safe patterns (e.g., "aws iam list-")
411 | 2. General safe patterns that apply to any service (e.g., "--help")
412 |
413 | Args:
414 | command: The command to check
415 | service: The AWS service being used
416 |
417 | Returns:
418 | True if the command is safe, False otherwise
419 | """
420 | # First check service-specific safe patterns
421 | if service in SECURITY_CONFIG.safe_patterns:
422 | # Check if the command matches any safe pattern for this service
423 | for safe_pattern in SECURITY_CONFIG.safe_patterns[service]:
424 | if command.startswith(safe_pattern):
425 | logger.debug(f"Command matches service-specific safe pattern: {safe_pattern}")
426 | return True
427 |
428 | # Then check general safe patterns that apply to all services
429 | if "general" in SECURITY_CONFIG.safe_patterns:
430 | for safe_pattern in SECURITY_CONFIG.safe_patterns["general"]:
431 | if safe_pattern in command:
432 | logger.debug(f"Command matches general safe pattern: {safe_pattern}")
433 | return True
434 |
435 | return False
436 |
437 |
438 | def check_regex_rules(command: str, service: Optional[str] = None) -> Optional[str]:
439 | """Check command against regex rules.
440 |
441 | Args:
442 | command: The command to check
443 | service: The AWS service being used, if known
444 |
445 | Returns:
446 | Error message if command matches a regex rule, None otherwise
447 | """
448 | # Check general rules that apply to all commands
449 | if "general" in SECURITY_CONFIG.regex_rules:
450 | for rule in SECURITY_CONFIG.regex_rules["general"]:
451 | pattern = re.compile(rule.pattern)
452 | if pattern.search(command):
453 | logger.warning(f"Command matches regex rule: {rule.description}")
454 | return rule.error_message
455 |
456 | # Check service-specific rules if service is provided
457 | if service and service in SECURITY_CONFIG.regex_rules:
458 | for rule in SECURITY_CONFIG.regex_rules[service]:
459 | pattern = re.compile(rule.pattern)
460 | if pattern.search(command):
461 | logger.warning(f"Command matches service-specific regex rule for {service}: {rule.description}")
462 | return rule.error_message
463 |
464 | return None
465 |
466 |
467 | def validate_aws_command(command: str) -> None:
468 | """Validate that the command is a proper AWS CLI command.
469 |
470 | Args:
471 | command: The AWS CLI command to validate
472 |
473 | Raises:
474 | ValueError: If the command is invalid
475 | """
476 | logger.debug(f"Validating AWS command: {command}")
477 |
478 | # Skip validation in permissive mode
479 | if SECURITY_MODE.lower() == "permissive":
480 | logger.warning(f"Running in permissive security mode, skipping validation for: {command}")
481 | return
482 |
483 | # Basic validation
484 | cmd_parts = shlex.split(command)
485 | if not cmd_parts or cmd_parts[0].lower() != "aws":
486 | raise ValueError("Commands must start with 'aws'")
487 |
488 | if len(cmd_parts) < 2:
489 | raise ValueError("Command must include an AWS service (e.g., aws s3)")
490 |
491 | # Get the service from the command
492 | service = cmd_parts[1].lower()
493 |
494 | # Check regex rules first (these apply regardless of service)
495 | error_message = check_regex_rules(command, service)
496 | if error_message:
497 | raise ValueError(error_message)
498 |
499 | # Check against dangerous commands for this service
500 | if service in SECURITY_CONFIG.dangerous_commands:
501 | # Check each dangerous command pattern
502 | for dangerous_cmd in SECURITY_CONFIG.dangerous_commands[service]:
503 | if command.startswith(dangerous_cmd):
504 | # If it's a dangerous command, check if it's also in safe patterns
505 | if is_service_command_safe(command, service):
506 | return # Command is safe despite matching dangerous pattern
507 |
508 | # Command is dangerous, raise an error
509 | raise ValueError(
510 | f"This command ({dangerous_cmd}) is restricted for security reasons. "
511 | f"Please use a more specific, read-only command or add 'help' or '--help' to see available options."
512 | )
513 |
514 | logger.debug(f"Command validation successful: {command}")
515 |
516 |
517 | def validate_pipe_command(pipe_command: str) -> None:
518 | """Validate a command that contains pipes.
519 |
520 | This checks both AWS CLI commands and Unix commands within a pipe chain.
521 |
522 | Args:
523 | pipe_command: The piped command to validate
524 |
525 | Raises:
526 | ValueError: If any command in the pipe is invalid
527 | """
528 | logger.debug(f"Validating pipe command: {pipe_command}")
529 |
530 | # Skip validation in permissive mode
531 | if SECURITY_MODE.lower() == "permissive":
532 | logger.warning(f"Running in permissive security mode, skipping validation for: {pipe_command}")
533 | return
534 |
535 | commands = split_pipe_command(pipe_command)
536 |
537 | if not commands:
538 | raise ValueError("Empty command")
539 |
540 | # First command must be an AWS CLI command
541 | validate_aws_command(commands[0])
542 |
543 | # Subsequent commands should be valid Unix commands
544 | for i, cmd in enumerate(commands[1:], 1):
545 | cmd_parts = shlex.split(cmd)
546 | if not cmd_parts:
547 | raise ValueError(f"Empty command at position {i} in pipe")
548 |
549 | if not validate_unix_command(cmd):
550 | raise ValueError(f"Command '{cmd_parts[0]}' at position {i} in pipe is not allowed. Only AWS commands and basic Unix utilities are permitted.")
551 |
552 | logger.debug(f"Pipe command validation successful: {pipe_command}")
553 |
554 |
555 | def reload_security_config() -> None:
556 | """Reload security configuration from file.
557 |
558 | This allows for dynamic reloading of security rules without restarting the server.
559 | """
560 | global SECURITY_CONFIG
561 | SECURITY_CONFIG = load_security_config()
562 | logger.info("Security configuration reloaded")
563 |
564 |
565 | def validate_command(command: str) -> None:
566 | """Centralized validation for all commands.
567 |
568 | This is the main entry point for command validation. The validation follows a multi-step process:
569 |
570 | 1. Check if security validation should be skipped (permissive mode)
571 | 2. Determine command type (piped or regular AWS command)
572 | 3. For regular AWS commands:
573 | a. Verify basic structure (starts with 'aws' and has a service)
574 | b. Check against regex rules for complex patterns
575 | c. Check if it matches any dangerous command patterns
576 | d. If dangerous, check if it also matches any safe patterns
577 | 4. For piped commands:
578 | a. Validate the AWS portion as above
579 | b. Validate that pipe targets are allowed Unix commands
580 |
581 | Args:
582 | command: The command to validate
583 |
584 | Raises:
585 | ValueError: If the command is invalid with a descriptive error message
586 | """
587 | logger.debug(f"Validating command: {command}")
588 |
589 | # Step 1: Skip validation in permissive mode
590 | if SECURITY_MODE.lower() == "permissive":
591 | logger.warning(f"Running in permissive security mode, skipping validation for: {command}")
592 | return
593 |
594 | # Step 2: Determine command type and validate accordingly
595 | if is_pipe_command(command):
596 | validate_pipe_command(command)
597 | else:
598 | validate_aws_command(command)
599 |
600 | logger.debug(f"Command validation successful: {command}")
601 |
```
--------------------------------------------------------------------------------
/tests/unit/test_resources.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for AWS MCP Server resources module.
2 |
3 | These tests verify that the resources functionality in the resources module
4 | works correctly, with appropriate mocking to avoid actual AWS API calls.
5 | """
6 |
7 | import os
8 | from unittest.mock import MagicMock, patch
9 |
10 | import pytest
11 | from botocore.exceptions import ClientError
12 |
13 | from aws_mcp_server.resources import (
14 | _get_region_description,
15 | _get_region_geographic_location,
16 | _mask_key,
17 | get_aws_account_info,
18 | get_aws_environment,
19 | get_aws_profiles,
20 | get_aws_regions,
21 | get_region_available_services,
22 | get_region_details,
23 | register_resources,
24 | )
25 |
26 |
27 | @pytest.fixture
28 | def mock_config_files(monkeypatch, tmp_path):
29 | """Create mock AWS config and credentials files for testing."""
30 | config_dir = tmp_path / ".aws"
31 | config_dir.mkdir()
32 |
33 | # Create mock config file
34 | config_file = config_dir / "config"
35 | config_file.write_text("[default]\nregion = us-west-2\n\n[profile dev]\nregion = us-east-1\n\n[profile prod]\nregion = eu-west-1\n")
36 |
37 | # Create mock credentials file
38 | creds_file = config_dir / "credentials"
39 | creds_file.write_text(
40 | "[default]\n"
41 | "aws_access_key_id = AKIADEFAULT000000000\n"
42 | "aws_secret_access_key = 1234567890abcdef1234567890abcdef\n"
43 | "\n"
44 | "[dev]\n"
45 | "aws_access_key_id = AKIADEV0000000000000\n"
46 | "aws_secret_access_key = abcdef1234567890abcdef1234567890\n"
47 | "\n"
48 | "[test]\n" # Profile in credentials but not in config
49 | "aws_access_key_id = AKIATEST000000000000\n"
50 | "aws_secret_access_key = test1234567890abcdef1234567890ab\n"
51 | )
52 |
53 | # Set environment to use these files
54 | monkeypatch.setenv("HOME", str(tmp_path))
55 | return tmp_path
56 |
57 |
58 | def test_get_aws_profiles(mock_config_files):
59 | """Test retrieving AWS profiles from config files."""
60 | profiles = get_aws_profiles()
61 |
62 | # Should find all profiles from both files, with no duplicates
63 | assert set(profiles) == {"default", "dev", "prod", "test"}
64 |
65 |
66 | @patch("boto3.session.Session")
67 | def test_get_aws_regions(mock_session):
68 | """Test retrieving AWS regions with mocked boto3."""
69 | # Mock boto3 session and client response
70 | mock_ec2 = MagicMock()
71 | mock_session.return_value.client.return_value = mock_ec2
72 |
73 | mock_ec2.describe_regions.return_value = {
74 | "Regions": [
75 | {"RegionName": "us-east-1"},
76 | {"RegionName": "us-west-2"},
77 | {"RegionName": "eu-central-1"},
78 | ]
79 | }
80 |
81 | regions = get_aws_regions()
82 |
83 | # Check regions are properly formatted
84 | assert len(regions) == 3
85 | assert regions[0]["RegionName"] == "eu-central-1" # Sorted alphabetically
86 | assert regions[0]["RegionDescription"] == "EU Central (Frankfurt)"
87 | assert regions[1]["RegionName"] == "us-east-1"
88 | assert regions[2]["RegionName"] == "us-west-2"
89 |
90 | # Verify correct session and client creation
91 | mock_session.assert_called_once()
92 | mock_session.return_value.client.assert_called_once_with("ec2")
93 |
94 |
95 | @patch("boto3.session.Session")
96 | def test_get_aws_regions_fallback(mock_session):
97 | """Test fallback behavior when region retrieval fails."""
98 | # Mock boto3 to raise an exception
99 | mock_session.return_value.client.side_effect = ClientError({"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, "DescribeRegions")
100 |
101 | regions = get_aws_regions()
102 |
103 | # Should fall back to static region list
104 | assert len(regions) >= 12 # Should include at least the major regions
105 | assert any(r["RegionName"] == "us-east-1" for r in regions)
106 | assert any(r["RegionName"] == "eu-west-1" for r in regions)
107 |
108 |
109 | @patch("boto3.session.Session")
110 | def test_get_aws_environment(mock_session, monkeypatch):
111 | """Test retrieving AWS environment information."""
112 | # Set up environment variables
113 | monkeypatch.setenv("AWS_PROFILE", "test-profile")
114 | monkeypatch.setenv("AWS_REGION", "us-west-2")
115 |
116 | # Mock boto3 credentials
117 | mock_credentials = MagicMock()
118 | mock_credentials.method = "shared-credentials-file"
119 | mock_session.return_value.get_credentials.return_value = mock_credentials
120 |
121 | env_info = get_aws_environment()
122 |
123 | # Check environment information
124 | assert env_info["aws_profile"] == "test-profile"
125 | assert env_info["aws_region"] == "us-west-2"
126 | assert env_info["has_credentials"] is True
127 | assert env_info["credentials_source"] == "profile"
128 |
129 |
130 | @patch("boto3.session.Session")
131 | def test_get_aws_environment_no_credentials(mock_session, monkeypatch):
132 | """Test environment info with no credentials."""
133 | # Clear relevant environment variables
134 | for var in ["AWS_PROFILE", "AWS_REGION", "AWS_DEFAULT_REGION", "AWS_ACCESS_KEY_ID"]:
135 | if var in os.environ:
136 | monkeypatch.delenv(var)
137 |
138 | # No credentials available
139 | mock_session.return_value.get_credentials.return_value = None
140 |
141 | env_info = get_aws_environment()
142 |
143 | # Check environment information with defaults
144 | assert env_info["aws_profile"] == "default"
145 | assert env_info["aws_region"] == "us-east-1"
146 | assert env_info["has_credentials"] is False
147 | assert env_info["credentials_source"] == "none"
148 |
149 |
150 | @patch("boto3.session.Session")
151 | def test_get_aws_account_info(mock_session):
152 | """Test retrieving AWS account information."""
153 | # Mock boto3 clients
154 | mock_sts = MagicMock()
155 | mock_iam = MagicMock()
156 | mock_org = MagicMock()
157 |
158 | mock_session.return_value.client.side_effect = lambda service: {"sts": mock_sts, "iam": mock_iam, "organizations": mock_org}[service]
159 |
160 | # Mock API responses
161 | mock_sts.get_caller_identity.return_value = {"Account": "123456789012"}
162 | mock_iam.list_account_aliases.return_value = {"AccountAliases": ["my-account"]}
163 | mock_org.describe_organization.return_value = {"OrganizationId": "o-abcdef1234"}
164 |
165 | account_info = get_aws_account_info()
166 |
167 | # Check account information
168 | assert account_info["account_id"] == "123456789012"
169 | assert account_info["account_alias"] == "my-account"
170 | assert account_info["organization_id"] == "o-abcdef1234"
171 |
172 |
173 | @patch("boto3.session.Session")
174 | def test_get_aws_account_info_minimal(mock_session):
175 | """Test account info with minimal permissions."""
176 | # Mock boto3 sts client, but iam/org calls fail
177 | mock_sts = MagicMock()
178 |
179 | def mock_client(service):
180 | if service == "sts":
181 | return mock_sts
182 | elif service == "iam":
183 | raise ClientError({"Error": {"Code": "AccessDenied"}}, "ListAccountAliases")
184 | else:
185 | raise ClientError({"Error": {"Code": "AccessDenied"}}, "DescribeAccount")
186 |
187 | mock_session.return_value.client.side_effect = mock_client
188 |
189 | # Mock API response
190 | mock_sts.get_caller_identity.return_value = {"Account": "123456789012"}
191 |
192 | account_info = get_aws_account_info()
193 |
194 | # Should have account ID but not alias or org ID
195 | assert account_info["account_id"] == "123456789012"
196 | assert account_info["account_alias"] is None
197 | assert account_info["organization_id"] is None
198 |
199 |
200 | def test_register_resources():
201 | """Test registering MCP resources."""
202 | # Mock MCP instance
203 | mock_mcp = MagicMock()
204 |
205 | # Register resources
206 | register_resources(mock_mcp)
207 |
208 | # Verify resource registration
209 | assert mock_mcp.resource.call_count == 5 # Should register 5 resources
210 |
211 | # Check that resource was called with the correct URIs, names and descriptions
212 | expected_resources = [
213 | {"uri": "aws://config/profiles", "name": "aws_profiles", "description": "Get available AWS profiles"},
214 | {"uri": "aws://config/regions", "name": "aws_regions", "description": "Get available AWS regions"},
215 | {"uri": "aws://config/regions/{region}", "name": "aws_region_details", "description": "Get detailed information about a specific AWS region"},
216 | {"uri": "aws://config/environment", "name": "aws_environment", "description": "Get AWS environment information"},
217 | {"uri": "aws://config/account", "name": "aws_account", "description": "Get AWS account information"},
218 | ]
219 |
220 | # Extract parameters from each call
221 | for call in mock_mcp.resource.call_args_list:
222 | found = False
223 | for resource in expected_resources:
224 | if resource["uri"] == call.kwargs.get("uri"):
225 | # Check name and description too
226 | assert call.kwargs.get("name") == resource["name"]
227 | assert call.kwargs.get("description") == resource["description"]
228 | found = True
229 | break
230 | assert found, f"URI {call.kwargs.get('uri')} not found in expected resources"
231 |
232 |
233 | def test_get_region_description():
234 | """Test the region description utility function."""
235 | # Test known regions
236 | assert _get_region_description("us-east-1") == "US East (N. Virginia)"
237 | assert _get_region_description("eu-west-2") == "EU West (London)"
238 | assert _get_region_description("ap-southeast-1") == "Asia Pacific (Singapore)"
239 |
240 | # Test unknown regions
241 | assert _get_region_description("unknown-region-1") == "AWS Region unknown-region-1"
242 | assert _get_region_description("test-region-2") == "AWS Region test-region-2"
243 |
244 |
245 | def test_mask_key():
246 | """Test the key masking utility function."""
247 | # Test empty input
248 | assert _mask_key("") == ""
249 |
250 | # Test short keys (less than 3 chars)
251 | assert _mask_key("a") == "a"
252 | assert _mask_key("ab") == "ab"
253 |
254 | # Test longer keys
255 | assert _mask_key("abc") == "abc"
256 | assert _mask_key("abcd") == "abc*"
257 | assert _mask_key("abcdef") == "abc***"
258 | assert _mask_key("AKIAIOSFODNN7EXAMPLE") == "AKI*****************"
259 |
260 |
261 | @patch("configparser.ConfigParser")
262 | @patch("os.path.exists")
263 | def test_get_aws_profiles_exception(mock_exists, mock_config_parser):
264 | """Test exception handling in get_aws_profiles."""
265 | # Setup mocks
266 | mock_exists.return_value = True
267 | mock_parser_instance = MagicMock()
268 | mock_config_parser.return_value = mock_parser_instance
269 |
270 | # Simulate an exception when reading the config
271 | mock_parser_instance.read.side_effect = Exception("Config file error")
272 |
273 | # Call function
274 | profiles = get_aws_profiles()
275 |
276 | # Verify profiles contains only the default profile
277 | assert profiles == ["default"]
278 | assert mock_parser_instance.read.called
279 |
280 |
281 | @patch("boto3.session.Session")
282 | def test_get_aws_regions_generic_exception(mock_session):
283 | """Test general exception handling in get_aws_regions."""
284 | # Mock boto3 to raise a generic exception (not ClientError)
285 | mock_session.return_value.client.side_effect = Exception("Generic error")
286 |
287 | # Call function
288 | regions = get_aws_regions()
289 |
290 | # Should return empty list for general exceptions
291 | assert len(regions) == 0
292 | assert isinstance(regions, list)
293 |
294 |
295 | @patch("boto3.session.Session")
296 | def test_get_aws_environment_credential_methods(mock_session):
297 | """Test different credential methods in get_aws_environment."""
298 | # Set up different credential methods to test
299 | test_cases = [
300 | ("environment", "environment"),
301 | ("iam-role", "instance-profile"),
302 | ("assume-role", "assume-role"),
303 | ("container-role", "container-role"),
304 | ("unknown-method", "profile"), # Should fall back to "profile" for unknown methods
305 | ]
306 |
307 | for method, expected_source in test_cases:
308 | # Reset mock
309 | mock_session.reset_mock()
310 |
311 | # Set up mock credentials
312 | mock_credentials = MagicMock()
313 | mock_credentials.method = method
314 | mock_session.return_value.get_credentials.return_value = mock_credentials
315 |
316 | # Call function
317 | env_info = get_aws_environment()
318 |
319 | # Verify credential source
320 | assert env_info["has_credentials"] is True
321 | assert env_info["credentials_source"] == expected_source
322 |
323 |
324 | @patch("boto3.session.Session")
325 | def test_get_aws_environment_exception(mock_session):
326 | """Test exception handling in get_aws_environment."""
327 | # Mock boto3 to raise an exception
328 | mock_session.return_value.get_credentials.side_effect = Exception("Credential error")
329 |
330 | # Call function
331 | env_info = get_aws_environment()
332 |
333 | # Should still return valid env info with default values
334 | assert env_info["aws_profile"] == "default"
335 | assert env_info["aws_region"] == "us-east-1"
336 | assert env_info["has_credentials"] is False
337 | assert env_info["credentials_source"] == "none"
338 |
339 |
340 | @patch("boto3.session.Session")
341 | def test_get_aws_account_info_with_org(mock_session):
342 | """Test AWS account info with organization access."""
343 | # Mock boto3 clients
344 | mock_sts = MagicMock()
345 | mock_iam = MagicMock()
346 | mock_org = MagicMock()
347 |
348 | mock_session.return_value.client.side_effect = lambda service: {"sts": mock_sts, "iam": mock_iam, "organizations": mock_org}[service]
349 |
350 | # Mock API responses
351 | mock_sts.get_caller_identity.return_value = {"Account": "123456789012"}
352 | mock_iam.list_account_aliases.return_value = {"AccountAliases": ["my-account"]}
353 |
354 | # Mock org response for describe_organization
355 | mock_org.describe_organization.return_value = {"OrganizationId": None}
356 |
357 | # Call function
358 | account_info = get_aws_account_info()
359 |
360 | # Verify account info (organization_id should be None)
361 | assert account_info["account_id"] == "123456789012"
362 | assert account_info["account_alias"] == "my-account"
363 | assert account_info["organization_id"] is None
364 |
365 |
366 | @patch("boto3.session.Session")
367 | def test_get_aws_account_info_general_exception(mock_session):
368 | """Test general exception handling in get_aws_account_info."""
369 | # Mock boto3 to raise a generic exception
370 | mock_session.return_value.client.side_effect = Exception("Generic error")
371 |
372 | # Call function
373 | account_info = get_aws_account_info()
374 |
375 | # All fields should be None
376 | assert account_info["account_id"] is None
377 | assert account_info["account_alias"] is None
378 | assert account_info["organization_id"] is None
379 |
380 |
381 | @patch("aws_mcp_server.resources.get_aws_profiles")
382 | @patch("os.environ.get")
383 | def test_resource_aws_profiles(mock_environ_get, mock_get_aws_profiles):
384 | """Test the aws_profiles resource function implementation."""
385 | # Set up environment mocks
386 | mock_environ_get.return_value = "test-profile"
387 |
388 | # Set up profiles mock
389 | mock_get_aws_profiles.return_value = ["default", "test-profile", "dev"]
390 |
391 | # Create a mock function that simulates the decorated function
392 | # Note: We need to call the mocked functions, not the original ones
393 | async def mock_resource_function():
394 | profiles = mock_get_aws_profiles.return_value
395 | current_profile = mock_environ_get.return_value
396 | return {"profiles": [{"name": profile, "is_current": profile == current_profile} for profile in profiles]}
397 |
398 | # Call the function
399 | import asyncio
400 |
401 | result = asyncio.run(mock_resource_function())
402 |
403 | # Verify the result
404 | assert "profiles" in result
405 | assert len(result["profiles"]) == 3
406 |
407 | # Check that current profile is marked
408 | current_profile = None
409 | for profile in result["profiles"]:
410 | if profile["is_current"]:
411 | current_profile = profile["name"]
412 |
413 | assert current_profile == "test-profile"
414 |
415 |
416 | @patch("aws_mcp_server.resources.get_aws_regions")
417 | @patch("os.environ.get")
418 | def test_resource_aws_regions(mock_environ_get, mock_get_aws_regions):
419 | """Test the aws_regions resource function implementation."""
420 | # Set up environment mocks to return us-west-2 for either AWS_REGION or AWS_DEFAULT_REGION
421 | mock_environ_get.side_effect = lambda key, default=None: "us-west-2" if key in ("AWS_REGION", "AWS_DEFAULT_REGION") else default
422 |
423 | # Set up regions mock
424 | mock_get_aws_regions.return_value = [
425 | {"RegionName": "us-east-1", "RegionDescription": "US East (N. Virginia)"},
426 | {"RegionName": "us-west-2", "RegionDescription": "US West (Oregon)"},
427 | ]
428 |
429 | # Create a mock function that simulates the decorated function
430 | # Note: We need to call the mocked functions, not the original ones
431 | async def mock_resource_function():
432 | regions = mock_get_aws_regions.return_value
433 | current_region = "us-west-2" # From the mock_environ_get.side_effect
434 | return {
435 | "regions": [
436 | {
437 | "name": region["RegionName"],
438 | "description": region["RegionDescription"],
439 | "is_current": region["RegionName"] == current_region,
440 | }
441 | for region in regions
442 | ]
443 | }
444 |
445 | # Call the function
446 | import asyncio
447 |
448 | result = asyncio.run(mock_resource_function())
449 |
450 | # Verify the result
451 | assert "regions" in result
452 | assert len(result["regions"]) == 2
453 |
454 | # Check that current region is marked
455 | current_region = None
456 | for region in result["regions"]:
457 | if region["is_current"]:
458 | current_region = region["name"]
459 |
460 | assert current_region == "us-west-2"
461 |
462 |
463 | @patch("aws_mcp_server.resources.get_aws_environment")
464 | def test_resource_aws_environment(mock_get_aws_environment):
465 | """Test the aws_environment resource function implementation."""
466 | # Set up environment mock
467 | mock_env = {
468 | "aws_profile": "test-profile",
469 | "aws_region": "us-west-2",
470 | "has_credentials": True,
471 | "credentials_source": "profile",
472 | }
473 | mock_get_aws_environment.return_value = mock_env
474 |
475 | # Create a mock function that simulates the decorated function
476 | # Note: We need to call the mocked function, not the original one
477 | async def mock_resource_function():
478 | return mock_get_aws_environment.return_value
479 |
480 | # Call the function
481 | import asyncio
482 |
483 | result = asyncio.run(mock_resource_function())
484 |
485 | # Verify the result is the same as the mock env
486 | assert result == mock_env
487 |
488 |
489 | @patch("aws_mcp_server.resources.get_aws_account_info")
490 | def test_resource_aws_account(mock_get_aws_account_info):
491 | """Test the aws_account resource function implementation."""
492 | # Set up account info mock
493 | mock_account_info = {
494 | "account_id": "123456789012",
495 | "account_alias": "test-account",
496 | "organization_id": "o-abcdef123456",
497 | }
498 | mock_get_aws_account_info.return_value = mock_account_info
499 |
500 | # Create a mock function that simulates the decorated function
501 | # Note: We need to call the mocked function, not the original one
502 | async def mock_resource_function():
503 | return mock_get_aws_account_info.return_value
504 |
505 | # Call the function
506 | import asyncio
507 |
508 | result = asyncio.run(mock_resource_function())
509 |
510 | # Verify the result is the same as the mock account info
511 | assert result == mock_account_info
512 |
513 |
514 | def test_get_region_geographic_location():
515 | """Test the region geographic location utility function."""
516 | # Test known regions
517 | us_east_1 = _get_region_geographic_location("us-east-1")
518 | assert us_east_1["continent"] == "North America"
519 | assert us_east_1["country"] == "United States"
520 | assert us_east_1["city"] == "Ashburn, Virginia"
521 |
522 | eu_west_2 = _get_region_geographic_location("eu-west-2")
523 | assert eu_west_2["continent"] == "Europe"
524 | assert eu_west_2["country"] == "United Kingdom"
525 | assert eu_west_2["city"] == "London"
526 |
527 | # Test unknown region
528 | unknown = _get_region_geographic_location("unknown-region")
529 | assert unknown["continent"] == "Unknown"
530 | assert unknown["country"] == "Unknown"
531 | assert unknown["city"] == "Unknown"
532 |
533 |
534 | @patch("boto3.session.Session")
535 | def test_get_region_available_services(mock_session):
536 | """Test retrieving available AWS services for a region using Service Quotas API."""
537 | # Mock the Service Quotas client
538 | mock_quotas_client = MagicMock()
539 |
540 | # Set up the mock session to return our mock clients
541 | def mock_client(service_name, **kwargs):
542 | if service_name == "service-quotas":
543 | return mock_quotas_client
544 | return MagicMock()
545 |
546 | mock_session.return_value.client.side_effect = mock_client
547 |
548 | # Mock the Service Quotas API response
549 | mock_quotas_client.list_services.return_value = {
550 | "Services": [
551 | {"ServiceCode": "AWS.EC2", "ServiceName": "Amazon Elastic Compute Cloud"},
552 | {"ServiceCode": "AWS.S3", "ServiceName": "Amazon Simple Storage Service"},
553 | {"ServiceCode": "Lambda", "ServiceName": "AWS Lambda"},
554 | {"ServiceCode": "Organizations", "ServiceName": "AWS Organizations"},
555 | {"ServiceCode": "AWS.CloudFormation", "ServiceName": "AWS CloudFormation"},
556 | ],
557 | "NextToken": None,
558 | }
559 |
560 | # Call the function
561 | services = get_region_available_services(mock_session.return_value, "us-east-1")
562 |
563 | # Verify the results
564 | assert len(services) == 5
565 |
566 | # Verify service code transformations
567 | assert {"id": "ec2", "name": "Amazon Elastic Compute Cloud"} in services
568 | assert {"id": "s3", "name": "Amazon Simple Storage Service"} in services
569 | assert {"id": "lambda", "name": "AWS Lambda"} in services
570 | assert {"id": "organizations", "name": "AWS Organizations"} in services
571 | assert {"id": "cloudformation", "name": "AWS CloudFormation"} in services
572 |
573 | # Verify the API was called correctly
574 | mock_quotas_client.list_services.assert_called_once()
575 |
576 |
577 | @patch("boto3.session.Session")
578 | def test_get_region_available_services_pagination(mock_session):
579 | """Test pagination handling in Service Quotas API."""
580 | # Mock the Service Quotas client
581 | mock_quotas_client = MagicMock()
582 |
583 | # Set up the mock session to return our mock client
584 | mock_session.return_value.client.return_value = mock_quotas_client
585 |
586 | # Mock paginated responses
587 | mock_quotas_client.list_services.side_effect = [
588 | {
589 | "Services": [
590 | {"ServiceCode": "AWS.EC2", "ServiceName": "Amazon Elastic Compute Cloud"},
591 | {"ServiceCode": "AWS.S3", "ServiceName": "Amazon Simple Storage Service"},
592 | ],
593 | "NextToken": "next-token-1",
594 | },
595 | {
596 | "Services": [{"ServiceCode": "Lambda", "ServiceName": "AWS Lambda"}, {"ServiceCode": "AWS.DynamoDB", "ServiceName": "Amazon DynamoDB"}],
597 | "NextToken": None,
598 | },
599 | ]
600 |
601 | # Call the function
602 | services = get_region_available_services(mock_session.return_value, "us-east-1")
603 |
604 | # Verify the results
605 | assert len(services) == 4
606 |
607 | # Verify the pagination was handled correctly
608 | assert mock_quotas_client.list_services.call_count == 2
609 | # First call should have no NextToken
610 | mock_quotas_client.list_services.assert_any_call()
611 | # Second call should include the NextToken
612 | mock_quotas_client.list_services.assert_any_call(NextToken="next-token-1")
613 |
614 |
615 | @patch("boto3.session.Session")
616 | def test_get_region_available_services_fallback(mock_session):
617 | """Test fallback to client creation when Service Quotas API fails."""
618 |
619 | # Mock the session to raise an exception for Service Quotas
620 | def mock_client(service_name, **kwargs):
621 | if service_name == "service-quotas":
622 | raise ClientError({"Error": {"Code": "AccessDenied"}}, "ListServices")
623 | # For other services, return a mock to simulate success
624 | return MagicMock()
625 |
626 | mock_session.return_value.client.side_effect = mock_client
627 |
628 | # Call the function
629 | services = get_region_available_services(mock_session.return_value, "us-east-1")
630 |
631 | # Verify we got results from fallback method
632 | assert len(services) > 0
633 |
634 | # At least these common services should be in the result
635 | common_service_ids = [service["id"] for service in services]
636 | for service_id in ["ec2", "s3", "lambda"]:
637 | assert service_id in common_service_ids
638 |
639 | # Verify services have the correct structure
640 | for service in services:
641 | assert "id" in service
642 | assert "name" in service
643 |
644 |
645 | @patch("aws_mcp_server.resources.get_region_available_services")
646 | @patch("boto3.session.Session")
647 | def test_get_region_details(mock_session, mock_get_region_available_services):
648 | """Test retrieving detailed AWS region information."""
649 | # Mock the boto3 session and clients
650 | mock_ec2 = MagicMock()
651 |
652 | # Handle different boto3 client calls
653 | def mock_client(service_name, **kwargs):
654 | if service_name == "ec2":
655 | return mock_ec2
656 | # Return a mock for other services
657 | return MagicMock()
658 |
659 | mock_session.return_value.client.side_effect = mock_client
660 |
661 | # Mock EC2 availability zones response
662 | mock_ec2.describe_availability_zones.return_value = {
663 | "AvailabilityZones": [
664 | {"ZoneName": "us-east-1a", "State": "available", "ZoneId": "use1-az1", "ZoneType": "availability-zone"},
665 | {"ZoneName": "us-east-1b", "State": "available", "ZoneId": "use1-az2", "ZoneType": "availability-zone"},
666 | ]
667 | }
668 |
669 | # Mock the services list
670 | mock_services = [{"id": "ec2", "name": "EC2"}, {"id": "s3", "name": "S3"}, {"id": "lambda", "name": "Lambda"}]
671 | mock_get_region_available_services.return_value = mock_services
672 |
673 | # Call the function being tested
674 | region_details = get_region_details("us-east-1")
675 |
676 | # Verify basic region information
677 | assert region_details["code"] == "us-east-1"
678 | assert region_details["name"] == "US East (N. Virginia)"
679 |
680 | # Verify geographic location information
681 | geo_location = region_details["geographic_location"]
682 | assert geo_location["continent"] == "North America"
683 | assert geo_location["country"] == "United States"
684 | assert geo_location["city"] == "Ashburn, Virginia"
685 |
686 | # Verify availability zones
687 | assert len(region_details["availability_zones"]) == 2
688 | assert region_details["availability_zones"][0]["name"] == "us-east-1a"
689 | assert region_details["availability_zones"][1]["name"] == "us-east-1b"
690 |
691 | # Verify services
692 | assert region_details["services"] == mock_services
693 | mock_get_region_available_services.assert_called_once_with(mock_session.return_value, "us-east-1")
694 |
695 |
696 | @patch("aws_mcp_server.resources.get_region_available_services")
697 | @patch("boto3.session.Session")
698 | def test_get_region_details_with_error(mock_session, mock_get_region_available_services):
699 | """Test region details with API errors."""
700 | # Mock boto3 to raise an exception
701 | mock_session.return_value.client.side_effect = ClientError({"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, "DescribeAvailabilityZones")
702 |
703 | # Mock the get_region_available_services function to return an empty list
704 | mock_get_region_available_services.return_value = []
705 |
706 | # Call the function being tested
707 | region_details = get_region_details("us-east-1")
708 |
709 | # Should still return basic information even if AWS APIs fail
710 | assert region_details["code"] == "us-east-1"
711 | assert region_details["name"] == "US East (N. Virginia)"
712 | assert "geographic_location" in region_details
713 | assert len(region_details["availability_zones"]) == 0
714 | assert region_details["services"] == []
715 | mock_get_region_available_services.assert_called_once_with(mock_session.return_value, "us-east-1")
716 |
717 |
718 | @patch("aws_mcp_server.resources.get_region_details")
719 | def test_resource_aws_region_details(mock_get_region_details):
720 | """Test the aws_region_details resource function implementation."""
721 | # Set up region details mock
722 | mock_region_details = {
723 | "code": "us-east-1",
724 | "name": "US East (N. Virginia)",
725 | "geographic_location": {"continent": "North America", "country": "United States", "city": "Ashburn, Virginia"},
726 | "availability_zones": [
727 | {"name": "us-east-1a", "state": "available", "zone_id": "use1-az1", "zone_type": "availability-zone"},
728 | {"name": "us-east-1b", "state": "available", "zone_id": "use1-az2", "zone_type": "availability-zone"},
729 | ],
730 | "services": [{"id": "ec2", "name": "EC2"}, {"id": "s3", "name": "S3"}, {"id": "lambda", "name": "Lambda"}],
731 | "is_current": True,
732 | }
733 |
734 | mock_get_region_details.return_value = mock_region_details
735 |
736 | # Create a mock function that simulates the decorated function
737 | async def mock_resource_function(region: str):
738 | return mock_get_region_details(region)
739 |
740 | # Call the function
741 | import asyncio
742 |
743 | result = asyncio.run(mock_resource_function("us-east-1"))
744 |
745 | # Verify the function was called with the correct region code
746 | mock_get_region_details.assert_called_once_with("us-east-1")
747 |
748 | # Verify the result is the same as the mock details
749 | assert result == mock_region_details
750 |
```
--------------------------------------------------------------------------------
/src/aws_mcp_server/prompts.py:
--------------------------------------------------------------------------------
```python
1 | """AWS CLI prompt definitions for the AWS MCP Server.
2 |
3 | This module provides a collection of useful prompt templates for common AWS use cases.
4 | These prompts help ensure consistent best practices and efficient AWS resource management.
5 | """
6 |
7 | import logging
8 |
9 | logger = logging.getLogger(__name__)
10 |
11 |
12 | def register_prompts(mcp):
13 | """Register all prompts with the MCP server instance.
14 |
15 | Args:
16 | mcp: The FastMCP server instance
17 | """
18 | logger.info("Registering AWS prompt templates")
19 |
20 | @mcp.prompt(name="create_resource", description="Generate AWS CLI commands to create common AWS resources with best practices")
21 | def create_resource(resource_type: str, resource_name: str) -> str:
22 | """Generate AWS CLI commands to create common AWS resources with best practices.
23 |
24 | Args:
25 | resource_type: Type of AWS resource to create (e.g., s3-bucket, ec2-instance, lambda)
26 | resource_name: Name for the new resource
27 |
28 | Returns:
29 | Formatted prompt string for resource creation
30 | """
31 | return f"""Generate the AWS CLI commands to create a new {resource_type} named {resource_name}
32 | following AWS Well-Architected Framework best practices.
33 |
34 | Please include:
35 | 1. The primary creation command with appropriate security settings
36 | 2. Any supporting resources needed (roles, policies, etc.)
37 | 3. Required tagging commands (Name, Environment, Purpose, Owner, Cost-Center)
38 | 4. Security hardening commands to enforce principle of least privilege
39 | 5. Encryption and data protection configuration
40 | 6. Verification commands to confirm successful creation
41 |
42 | Ensure the solution includes:
43 | - Proper encryption at rest and in transit
44 | - Secure access control mechanisms
45 | - Resource policies with appropriate permissions
46 | - Monitoring and logging setup with CloudWatch
47 | - Cost optimization considerations
48 |
49 | For IAM roles and policies, follow the principle of least privilege and explain any important
50 | security considerations specific to this resource type."""
51 |
52 | @mcp.prompt(name="security_audit", description="Generate AWS CLI commands for performing a security audit on a service")
53 | def security_audit(service: str) -> str:
54 | """Generate AWS CLI commands for performing a security audit on a service.
55 |
56 | Args:
57 | service: AWS service to audit (e.g., s3, ec2, iam, rds)
58 |
59 | Returns:
60 | Formatted prompt string for security auditing
61 | """
62 | return f"""Generate AWS CLI commands to perform a comprehensive security audit
63 | of {service} resources in my AWS account according to AWS Security Hub and Well-Architected Framework.
64 |
65 | Include commands to:
66 | 1. Identify resources with public access, excessive permissions, or security group vulnerabilities
67 | 2. Detect weak or unused security configurations and access controls
68 | 3. Check for unencrypted data (both at rest and in transit)
69 | 4. Verify enabled logging and monitoring capabilities
70 | 5. Assess IAM roles and policies attached to resources for overly permissive settings
71 | 6. Check for resource compliance with CIS AWS Foundations Benchmark
72 | 7. Identify potential security misconfigurations based on AWS security best practices
73 | 8. Detect unused credentials, access keys, and permissions
74 |
75 | Also provide:
76 | - Security findings categorized by severity (High, Medium, Low)
77 | - A prioritized list of remediation steps with corresponding CLI commands
78 | - Recommendations to implement automated security checks using AWS Config Rules"""
79 |
80 | @mcp.prompt(name="cost_optimization", description="Generate AWS CLI commands for cost optimization recommendations")
81 | def cost_optimization(service: str) -> str:
82 | """Generate AWS CLI commands for cost optimization recommendations.
83 |
84 | Args:
85 | service: AWS service to optimize costs for
86 |
87 | Returns:
88 | Formatted prompt string for cost optimization
89 | """
90 | return f"""Generate AWS CLI commands to identify cost optimization opportunities
91 | for {service} in my AWS account using AWS Cost Explorer and other cost management tools.
92 |
93 | Include commands to:
94 | 1. Find unused, idle, or underutilized resources with detailed utilization metrics
95 | 2. Identify resources that could be rightsized, downsized, or use a different pricing model
96 | 3. Detect patterns of usage that could benefit from Reserved Instances, Savings Plans, or Spot instances
97 | 4. Analyze resources without proper cost allocation tags and suggest tagging strategies
98 | 5. Generate a detailed cost breakdown by resource for the past 30 days
99 | 6. Identify optimal instance families based on workload patterns
100 | 7. Find opportunities to utilize AWS Graviton processors for better price-performance ratio
101 | 8. Check for resources that can leverage multi-region strategies for cost efficiency
102 |
103 | Also provide:
104 | - Cost-saving estimates for each recommendation
105 | - Commands to implement automated cost management using AWS Budgets
106 | - Scripts to schedule automated start/stop for dev/test environments
107 | - Best practices for implementing FinOps for {service}"""
108 |
109 | @mcp.prompt(name="resource_inventory", description="Generate AWS CLI commands to inventory resources for a service")
110 | def resource_inventory(service: str, region: str = "all") -> str:
111 | """Generate AWS CLI commands to inventory resources for a service.
112 |
113 | Args:
114 | service: AWS service to inventory (e.g., s3, ec2, rds)
115 | region: AWS region or "all" for multi-region inventory
116 |
117 | Returns:
118 | Formatted prompt string for resource inventory
119 | """
120 | region_text = f"in the {region} region" if region != "all" else "across all regions"
121 |
122 | return f"""Generate AWS CLI commands to create a comprehensive inventory
123 | of all {service} resources {region_text}.
124 |
125 | Include commands to:
126 | 1. List all resources with their key properties, metadata, and creation dates
127 | 2. Show resource relationships, dependencies, and associated infrastructure
128 | 3. Display resource tags, ownership information, and cost allocation
129 | 4. Identify untagged, potentially abandoned, or non-compliant resources
130 | 5. Export the inventory in structured formats (JSON, CSV) for further analysis
131 | 6. Group resources by type, status, size, and configuration
132 | 7. Include usage metrics and performance data where applicable
133 | 8. List attached IAM roles, policies, and security configurations
134 |
135 | Structure the commands to output to easily parsable formats that can be programmatically processed.
136 | Include jq filters to transform complex JSON output into useful summaries."""
137 |
138 | @mcp.prompt(name="troubleshoot_service", description="Generate AWS CLI commands for troubleshooting service issues")
139 | def troubleshoot_service(service: str, resource_id: str) -> str:
140 | """Generate AWS CLI commands for troubleshooting service issues.
141 |
142 | Args:
143 | service: AWS service to troubleshoot (e.g., ec2, rds, lambda)
144 | resource_id: ID of the specific resource having issues
145 |
146 | Returns:
147 | Formatted prompt string for troubleshooting
148 | """
149 | return f"""Generate AWS CLI commands to troubleshoot issues with {service}
150 | resource {resource_id} using a systematic diagnostic approach.
151 |
152 | Include commands to:
153 | 1. Check resource status, health, configuration, and performance metrics
154 | 2. Review recent changes, modifications, deployments, or infrastructure updates
155 | 3. Examine detailed logs, metrics, alarm history, and error patterns from CloudWatch
156 | 4. Verify network connectivity, security groups, NACLs, and routing settings
157 | 5. Diagnose potential service limits, throttling, or quota issues
158 | 6. Check for dependent services and connectivity between resources
159 | 7. Analyze IAM permissions and resource policies that might affect access
160 | 8. Validate configuration against AWS best practices and common failure patterns
161 |
162 | Structure the troubleshooting as a systematic process from:
163 | - Basic health and status verification
164 | - Configuration and recent changes analysis
165 | - Performance and resource utilization assessment
166 | - Network and connectivity validation
167 | - IAM and security verification
168 | - Dependent services analysis
169 | - Logging and monitoring data collection
170 |
171 | Include commands to collect all relevant diagnostic information into a single report that can be shared with AWS Support if needed."""
172 |
173 | @mcp.prompt(name="iam_policy_generator", description="Generate least-privilege IAM policies for specific services and actions")
174 | def iam_policy_generator(service: str, actions: str, resource_pattern: str = "*") -> str:
175 | """Generate least-privilege IAM policies for specific services and actions.
176 |
177 | Args:
178 | service: AWS service for the policy (e.g., s3, dynamodb)
179 | actions: Comma-separated list of actions (e.g., "GetObject,PutObject")
180 | resource_pattern: Resource ARN pattern (e.g., "arn:aws:s3:::my-bucket/*")
181 |
182 | Returns:
183 | Formatted prompt string for IAM policy generation
184 | """
185 | return f"""Generate a least-privilege IAM policy that allows only the required permissions
186 | for {service} with these specific actions: {actions}.
187 |
188 | Resource pattern: {resource_pattern}
189 |
190 | The policy should:
191 | 1. Follow AWS IAM security best practices and use the latest policy structure
192 | 2. Include only the minimum permissions needed for the stated actions
193 | 3. Use proper condition keys to restrict access by source IP, VPC, time, MFA, etc.
194 | 4. Implement appropriate resource-level permissions where supported
195 | 5. Include explanatory comments for each permission block
196 | 6. Use AWS managed policies where appropriate to reduce maintenance overhead
197 | 7. Be ready to use with the AWS CLI for policy creation
198 |
199 | Also provide:
200 | - The AWS CLI command to apply this policy to a role or user
201 | - Best practice recommendations for using policy boundaries
202 | - Explanation of potential security impact if permissions are too broad
203 | - Alternative permissions strategies if applicable (e.g., attribute-based access control)"""
204 |
205 | @mcp.prompt(name="service_monitoring", description="Generate AWS CLI commands to set up monitoring for a service")
206 | def service_monitoring(service: str, metric_type: str = "performance") -> str:
207 | """Generate AWS CLI commands to set up monitoring for a service.
208 |
209 | Args:
210 | service: AWS service to monitor (e.g., ec2, rds, lambda)
211 | metric_type: Type of metrics to monitor (e.g., performance, cost, security)
212 |
213 | Returns:
214 | Formatted prompt string for monitoring setup
215 | """
216 | return f"""Generate AWS CLI commands to set up comprehensive {metric_type} monitoring
217 | for {service} resources using CloudWatch, X-Ray, and other observability tools.
218 |
219 | Include commands to:
220 | 1. Create CloudWatch dashboards with relevant metrics and service-specific KPIs
221 | 2. Set up appropriate CloudWatch alarms with actionable thresholds and anomaly detection
222 | 3. Configure detailed logging with Log Insights queries for common analysis patterns
223 | 4. Enable AWS X-Ray tracing for distributed systems analysis where applicable
224 | 5. Create SNS topics and subscription for multi-channel notifications (email, Slack, PagerDuty)
225 | 6. Set up metric filters to extract critical information from log patterns
226 | 7. Configure composite alarms for complex monitoring scenarios
227 | 8. Enable AWS Service Health Dashboard notifications for service issues
228 |
229 | The monitoring solution should include:
230 | - Resource-specific metrics that indicate health and performance
231 | - Operational thresholds based on industry best practices
232 | - Multi-tier alerting with different severity levels
233 | - Automated remediation actions where appropriate
234 | - Integration with incident management workflows
235 |
236 | Ensure the commands follow operational excellence best practices from the Well-Architected Framework."""
237 |
238 | @mcp.prompt(name="disaster_recovery", description="Generate AWS CLI commands to implement disaster recovery for a service")
239 | def disaster_recovery(service: str, recovery_point_objective: str = "1 hour") -> str:
240 | """Generate AWS CLI commands to implement disaster recovery for a service.
241 |
242 | Args:
243 | service: AWS service to protect (e.g., ec2, rds, dynamodb)
244 | recovery_point_objective: Target RPO (e.g., "1 hour", "15 minutes")
245 |
246 | Returns:
247 | Formatted prompt string for DR setup
248 | """
249 | return f"""Generate AWS CLI commands to implement a disaster recovery solution
250 | for {service} with a Recovery Point Objective (RPO) of {recovery_point_objective} and minimal Recovery Time Objective (RTO).
251 |
252 | Include commands to:
253 | 1. Configure appropriate backup mechanisms (snapshots, replication, AWS Backup)
254 | 2. Set up cross-region or cross-account redundancy with proper data synchronization
255 | 3. Create automation for recovery processes using AWS Systems Manager documents
256 | 4. Implement comprehensive monitoring and alerting for backup failures
257 | 5. Define validation procedures to verify recovery readiness and integrity
258 | 6. Setup regular DR testing through automation
259 | 7. Configure failover mechanisms and DNS routing strategies using Route 53
260 | 8. Implement data integrity checks for backups and replicas
261 |
262 | The solution should:
263 | - Balance cost effectiveness with meeting the specified RPO
264 | - Follow AWS Well-Architected Framework best practices for reliability
265 | - Include automated recovery procedures that minimize manual intervention
266 | - Provide appropriate IAM roles and permissions for DR operations
267 | - Consider regional service availability differences
268 | - Include both data and configuration recovery"""
269 |
270 | @mcp.prompt(name="compliance_check", description="Generate AWS CLI commands to check compliance with standards")
271 | def compliance_check(compliance_standard: str, service: str = "all") -> str:
272 | """Generate AWS CLI commands to check compliance with standards.
273 |
274 | Args:
275 | compliance_standard: Compliance standard to check (e.g., "HIPAA", "PCI", "GDPR")
276 | service: Specific AWS service or "all" for account-wide checks
277 |
278 | Returns:
279 | Formatted prompt string for compliance checking
280 | """
281 | service_scope = f"for {service}" if service != "all" else "across all relevant services"
282 |
283 | return f"""Generate AWS CLI commands to assess {compliance_standard} compliance {service_scope}
284 | using AWS Config, AWS Security Hub, and AWS Audit Manager.
285 |
286 | Include commands to:
287 | 1. Identify resources that may not meet {compliance_standard} compliance requirements
288 | 2. Check encryption settings, key management, and data protection measures
289 | 3. Audit access controls, authentication mechanisms, and privilege management
290 | 4. Verify logging, monitoring configurations, and audit trail completeness
291 | 5. Assess network security, isolation, and boundary protection
292 | 6. Evaluate resource configurations against specific {compliance_standard} controls
293 | 7. Check for compliant tagging and resource documentation
294 | 8. Analyze retention policies for backups, logs, and archived data
295 |
296 | Also provide:
297 | - Remediation commands for common compliance gaps with {compliance_standard}
298 | - Explanation of specific {compliance_standard} requirements being checked
299 | - Commands to generate compliance reports using AWS Audit Manager
300 | - Instructions to set up continuous compliance monitoring
301 | - Best practices for maintaining ongoing compliance"""
302 |
303 | @mcp.prompt(name="resource_cleanup", description="Generate AWS CLI commands to identify and cleanup unused resources")
304 | def resource_cleanup(service: str, criteria: str = "unused") -> str:
305 | """Generate AWS CLI commands to identify and cleanup unused resources.
306 |
307 | Args:
308 | service: AWS service to cleanup (e.g., ec2, ebs, rds)
309 | criteria: Criteria for cleanup (e.g., "unused", "old", "untagged")
310 |
311 | Returns:
312 | Formatted prompt string for resource cleanup
313 | """
314 | return f"""Generate AWS CLI commands to identify and safely clean up {criteria} {service} resources
315 | to reduce costs and improve account hygiene.
316 |
317 | Include commands to:
318 | 1. Identify resources matching the {criteria} criteria with appropriate filters and metrics
319 | 2. Generate a detailed report of resources before deletion for review and approval
320 | 3. Create backups, snapshots, or exports where appropriate before removal
321 | 4. Safely delete or terminate the identified resources with proper validation
322 | 5. Verify successful cleanup and calculate actual cost savings
323 | 6. Check for orphaned dependent resources (volumes, snapshots, ENIs)
324 | 7. Identify resources that could be scheduled for regular cleanup
325 | 8. Capture resource metadata before deletion for audit purposes
326 |
327 | The commands should include:
328 | - Appropriate safeguards to prevent accidental deletion of critical resources
329 | - Dry-run options to preview changes before execution
330 | - Validation checks to ensure resources are truly unused
331 | - Tag-based identification of approved resources to preserve
332 | - Staged approach that isolates resources before deletion
333 | - Estimate of cost savings from cleanup activities
334 |
335 | Follow AWS operational best practices and include error handling."""
336 |
337 | @mcp.prompt(name="serverless_deployment", description="Generate AWS CLI commands to deploy a serverless application")
338 | def serverless_deployment(application_name: str, runtime: str = "python3.13") -> str:
339 | """Generate AWS CLI commands to deploy a serverless application.
340 |
341 | Args:
342 | application_name: Name for the serverless application
343 | runtime: Runtime environment (e.g., "python3.13", "nodejs20.x", "java17")
344 |
345 | Returns:
346 | Formatted prompt string for serverless deployment
347 | """
348 | return f"""Generate AWS CLI commands to deploy a serverless application named {application_name}
349 | using AWS SAM, Lambda, API Gateway, and DynamoDB with {runtime} runtime.
350 |
351 | Include commands to:
352 | 1. Initialize a new SAM application with best practices structure
353 | 2. Create necessary Lambda functions with appropriate IAM roles
354 | 3. Set up API Gateway endpoints with proper authorization
355 | 4. Deploy DynamoDB tables with optimal capacity and indexing
356 | 5. Configure CloudWatch Logs and X-Ray tracing
357 | 6. Set up CI/CD pipeline using AWS CodePipeline
358 | 7. Implement proper versioning and deployment strategies (canary, linear)
359 | 8. Create CloudFormation custom resources if needed
360 |
361 | The deployment should follow serverless best practices:
362 | - Appropriate function timeouts and memory allocation
363 | - Least privilege IAM permissions for each component
364 | - Parameter Store or Secrets Manager for configuration
365 | - Proper error handling and dead-letter queues
366 | - Efficient cold start optimization
367 | - Secure API authorization (JWT, IAM, Cognito)
368 | - Cost-effective resource utilization
369 |
370 | Include commands to verify the deployment and test the application endpoints."""
371 |
372 | @mcp.prompt(name="container_orchestration", description="Generate AWS CLI commands to set up container orchestration")
373 | def container_orchestration(cluster_name: str, service_type: str = "fargate") -> str:
374 | """Generate AWS CLI commands to set up container orchestration.
375 |
376 | Args:
377 | cluster_name: Name for the ECS/EKS cluster
378 | service_type: Type of service (e.g., "fargate", "ec2", "eks")
379 |
380 | Returns:
381 | Formatted prompt string for container deployment
382 | """
383 | return f"""Generate AWS CLI commands to set up a container orchestration environment
384 | with a {service_type} cluster named {cluster_name} following AWS best practices.
385 |
386 | Include commands to:
387 | 1. Create the {service_type} cluster with appropriate networking and security settings
388 | 2. Set up necessary IAM roles, task execution roles, and service roles
389 | 3. Configure task definitions with optimal resource allocation
390 | 4. Deploy services with appropriate scaling policies and load balancing
391 | 5. Implement service discovery and container insights monitoring
392 | 6. Set up logging and metric collection for containers
393 | 7. Configure secrets management for sensitive configuration
394 | 8. Implement proper security controls (ECR scanning, networking)
395 |
396 | The commands should address:
397 | - Proper networking design with security groups and VPC settings
398 | - Auto-scaling based on CPU, memory, and custom metrics
399 | - CI/CD pipeline integration for container deployment
400 | - Health checks and graceful deployment strategies
401 | - Container image security scanning and validation
402 | - Efficient resource utilization and cost management
403 | - High availability across multiple availability zones
404 | - Secrets and environment variable management
405 |
406 | Include validation commands to verify successful deployment and access."""
407 |
408 | @mcp.prompt(name="vpc_network_design", description="Generate AWS CLI commands to design and deploy a secure VPC")
409 | def vpc_network_design(vpc_name: str, cidr_block: str = "10.0.0.0/16") -> str:
410 | """Generate AWS CLI commands to design and deploy a secure VPC.
411 |
412 | Args:
413 | vpc_name: Name for the VPC
414 | cidr_block: CIDR block for the VPC (e.g., "10.0.0.0/16")
415 |
416 | Returns:
417 | Formatted prompt string for VPC design
418 | """
419 | return f"""Generate AWS CLI commands to design and deploy a secure, well-architected VPC
420 | named {vpc_name} with CIDR block {cidr_block} following AWS networking best practices.
421 |
422 | Include commands to:
423 | 1. Create the VPC with appropriate DNS and tenancy settings
424 | 2. Set up public and private subnets across multiple availability zones
425 | 3. Configure Internet Gateway, NAT Gateways, and route tables
426 | 4. Implement Network ACLs and security groups with least-privilege rules
427 | 5. Set up VPC endpoints for AWS services to improve security
428 | 6. Configure VPC Flow Logs for network traffic monitoring
429 | 7. Implement Transit Gateway or VPC Peering if needed
430 | 8. Set up DNS management with Route 53
431 |
432 | The VPC design should include:
433 | - High availability across at least 3 availability zones
434 | - Secure subnet segmentation (public, private, data)
435 | - Proper CIDR block allocation for future expansion
436 | - Security controls at multiple layers (NACLs, security groups)
437 | - Efficient routing and traffic flow optimization
438 | - Private connectivity to AWS services using endpoints
439 | - Network traffic monitoring and logging
440 | - Disaster recovery considerations
441 |
442 | Include validation commands to verify the network connectivity and security."""
443 |
444 | @mcp.prompt(name="infrastructure_automation", description="Generate AWS CLI commands for infrastructure automation")
445 | def infrastructure_automation(resource_type: str, automation_scope: str = "deployment") -> str:
446 | """Generate AWS CLI commands for infrastructure automation.
447 |
448 | Args:
449 | resource_type: Type of AWS resource to automate (e.g., ec2, rds, lambda)
450 | automation_scope: Type of automation (e.g., "deployment", "scaling", "patching")
451 |
452 | Returns:
453 | Formatted prompt string for infrastructure automation
454 | """
455 | return f"""Generate AWS CLI commands to implement {automation_scope} automation
456 | for {resource_type} resources using AWS Systems Manager, CloudFormation, and EventBridge.
457 |
458 | Include commands to:
459 | 1. Create automation documents or CloudFormation templates for consistent {automation_scope}
460 | 2. Set up EventBridge rules to trigger automation on schedule or event patterns
461 | 3. Configure necessary IAM roles and permissions with least privilege
462 | 4. Implement parameter validation and error handling in automation scripts
463 | 5. Set up notification and reporting for automation results
464 | 6. Create maintenance windows and safe deployment practices
465 | 7. Implement automated rollback mechanisms for failures
466 | 8. Configure cross-account or cross-region automation if needed
467 |
468 | The automation solution should:
469 | - Minimize manual intervention while maintaining appropriate approvals
470 | - Include proper logging and audit trails for all activities
471 | - Handle edge cases and failure scenarios gracefully
472 | - Scale to manage multiple resources efficiently
473 | - Follow infrastructure as code best practices
474 | - Include proper testing and validation steps
475 | - Respect maintenance windows and business hours
476 | - Provide detailed reporting and status tracking
477 |
478 | Include commands to validate the automation and test it in a controlled environment."""
479 |
480 | @mcp.prompt(name="security_posture_assessment", description="Generate AWS CLI commands for comprehensive security posture assessment")
481 | def security_posture_assessment() -> str:
482 | """Generate AWS CLI commands for comprehensive security posture assessment.
483 |
484 | Returns:
485 | Formatted prompt string for security assessment
486 | """
487 | return """Generate AWS CLI commands to perform a comprehensive security posture assessment
488 | across your AWS environment using Security Hub, IAM Access Analyzer, and GuardDuty.
489 |
490 | Include commands to:
491 | 1. Enable and configure AWS Security Hub with appropriate standards
492 | 2. Setup AWS Config for resource configuration monitoring
493 | 3. Enable GuardDuty for threat detection across all regions
494 | 4. Configure IAM Access Analyzer to identify external access
495 | 5. Review CloudTrail for complete activity logging coverage
496 | 6. Assess S3 bucket policies and access controls
497 | 7. Analyze password policies and MFA implementation
498 | 8. Evaluate network security groups and NACLs
499 |
500 | The assessment should check for:
501 | - Identity and access management best practices
502 | - Data protection mechanisms and encryption
503 | - Infrastructure security configurations
504 | - Detective controls and logging completeness
505 | - Compliance with industry standards (CIS, NIST, PCI)
506 | - Privileged access management
507 | - Potential lateral movement paths
508 | - Security monitoring and incident response readiness
509 |
510 | Include commands to generate comprehensive reports of findings organized by severity,
511 | and provide remediation steps for common security issues."""
512 |
513 | @mcp.prompt(name="performance_tuning", description="Generate AWS CLI commands for performance tuning of AWS resources")
514 | def performance_tuning(service: str, resource_id: str) -> str:
515 | """Generate AWS CLI commands for performance tuning of AWS resources.
516 |
517 | Args:
518 | service: AWS service to optimize (e.g., rds, ec2, lambda)
519 | resource_id: ID of the specific resource to tune
520 |
521 | Returns:
522 | Formatted prompt string for performance tuning
523 | """
524 | return f"""Generate AWS CLI commands to analyze and tune the performance of {service}
525 | resource {resource_id} based on metrics, benchmarks, and AWS best practices.
526 |
527 | Include commands to:
528 | 1. Gather detailed performance metrics using CloudWatch over various time periods
529 | 2. Analyze resource configuration and compare to recommended settings
530 | 3. Identify performance bottlenecks and resource constraints
531 | 4. Modify configuration parameters for optimal performance
532 | 5. Implement caching strategies if applicable
533 | 6. Adjust scaling policies and resource allocation
534 | 7. Configure enhanced monitoring for detailed insights
535 | 8. Benchmark performance before and after changes
536 |
537 | The performance tuning approach should:
538 | - Establish baseline performance metrics before changes
539 | - Target specific performance issues with measured approaches
540 | - Consider workload patterns and usage characteristics
541 | - Balance performance improvements with cost implications
542 | - Implement changes in staged approach with validation
543 | - Document performance gains and configuration changes
544 | - Address both immediate bottlenecks and long-term scaling
545 |
546 | Include commands to verify performance improvements and monitor for regressions."""
547 |
548 | @mcp.prompt(name="multi_account_governance", description="Generate AWS CLI commands to implement multi-account governance")
549 | def multi_account_governance(account_type: str = "organization") -> str:
550 | """Generate AWS CLI commands to implement multi-account governance.
551 |
552 | Args:
553 | account_type: Type of account structure (e.g., "organization", "control tower")
554 |
555 | Returns:
556 | Formatted prompt string for multi-account governance
557 | """
558 | return f"""Generate AWS CLI commands to implement robust multi-account governance
559 | using AWS Organizations, Control Tower, and {account_type} best practices.
560 |
561 | Include commands to:
562 | 1. Set up organizational units (OUs) with logical account grouping
563 | 2. Implement service control policies (SCPs) for security guardrails
564 | 3. Configure centralized logging with CloudTrail and CloudWatch Logs
565 | 4. Set up cross-account IAM roles with least privilege
566 | 5. Implement tag policies and resource tagging strategies
567 | 6. Configure AWS Config for multi-account compliance monitoring
568 | 7. Set up centralized security monitoring with Security Hub
569 | 8. Implement account baselining and standardization
570 |
571 | The governance framework should address:
572 | - Preventative guardrails using SCPs and permission boundaries
573 | - Detective controls with centralized logging and monitoring
574 | - Cost management and billing consolidation
575 | - Standardized network architecture across accounts
576 | - Identity federation and cross-account access
577 | - Centralized audit and compliance reporting
578 | - Automated account provisioning and baseline configuration
579 | - Resource sharing and cross-account service usage
580 |
581 | Include guidance on implementing a secure landing zone and account structure."""
582 |
583 | logger.info("Successfully registered all AWS prompt templates")
584 |
```