#
tokens: 31063/50000 25/25 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .github
│   ├── pull_request_template.md
│   └── workflows
│       └── pre-commit.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .python-version
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docs
│   ├── ai-integration.md
│   ├── architecture.md
│   ├── assets
│   │   ├── claude-desktop-settings.png
│   │   ├── claude-server-disconnected.png
│   │   └── Log-Analyzer-with-MCP-arch.png
│   ├── aws-config.md
│   ├── features.md
│   ├── troubleshooting.md
│   └── usage.md
├── LICENSE
├── NOTICE
├── pyproject.toml
├── README.md
├── src
│   ├── client.py
│   └── cw-mcp-server
│       ├── __init__.py
│       ├── resources
│       │   ├── __init__.py
│       │   └── cloudwatch_logs_resource.py
│       ├── server.py
│       └── tools
│           ├── __init__.py
│           ├── analysis_tools.py
│           ├── correlation_tools.py
│           ├── search_tools.py
│           └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12
2 | 
```

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
1 | repos:
2 |   - repo: https://github.com/astral-sh/ruff-pre-commit
3 |     rev: v0.11.10
4 |     hooks:
5 |       - id: ruff-check
6 |         args: [--fix]
7 |       - id: ruff-format
8 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
  1 | # Python
  2 | __pycache__/
  3 | *.py[co]
  4 | *.so
  5 | .Python
  6 | build/
  7 | develop-eggs/
  8 | dist/
  9 | downloads/
 10 | eggs/
 11 | .eggs/
 12 | lib/
 13 | lib64/
 14 | parts/
 15 | sdist/
 16 | var/
 17 | wheels/
 18 | *.egg-info/
 19 | .installed.cfg
 20 | *.egg
 21 | 
 22 | # Virtual environments
 23 | .env
 24 | .venv
 25 | env/
 26 | venv/
 27 | ENV/
 28 | env.bak/
 29 | venv.bak/
 30 | 
 31 | # IDE
 32 | .idea/
 33 | .vscode/
 34 | *.swp
 35 | *.swo
 36 | *~
 37 | 
 38 | # Testing
 39 | .coverage
 40 | htmlcov/
 41 | .tox/
 42 | .nox/
 43 | .coverage.*
 44 | .cache
 45 | nosetests.xml
 46 | coverage.xml
 47 | *.cover
 48 | *.py,cover
 49 | .hypothesis/
 50 | .pytest_cache/
 51 | 
 52 | # Distribution / packaging
 53 | .Python
 54 | build/
 55 | develop-eggs/
 56 | dist/
 57 | downloads/
 58 | eggs/
 59 | .eggs/
 60 | lib/
 61 | lib64/
 62 | parts/
 63 | sdist/
 64 | var/
 65 | wheels/
 66 | *.egg-info/
 67 | .installed.cfg
 68 | *.egg
 69 | 
 70 | # Jupyter Notebook
 71 | .ipynb_checkpoints
 72 | 
 73 | # pyenv
 74 | .python-version
 75 | 
 76 | # ruff
 77 | .ruff_cache/
 78 | 
 79 | # mypy
 80 | .mypy_cache/
 81 | .dmypy.json
 82 | dmypy.json
 83 | 
 84 | # OS specific
 85 | .DS_Store
 86 | .DS_Store?
 87 | ._*
 88 | .Spotlight-V100
 89 | .Trashes
 90 | ehthumbs.db
 91 | Thumbs.db
 92 | 
 93 | # Logs
 94 | *.log
 95 | logs/
 96 | log/
 97 | 
 98 | # Local development
 99 | .env
100 | .env.local
101 | .env.*.local
102 | 
103 | # Misc
104 | *.bak
105 | *.tmp
106 | *.temp
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Log Analyzer with MCP
 2 | 
 3 | A [Model Context Protocol (MCP)](https://modelcontextprotocol.io) server that provides AI assistants access to AWS CloudWatch Logs for analysis, searching, and correlation.
 4 | 
 5 | ## 🏗️ Architecture
 6 | ![Architecture Diagram](./docs/assets/Log-Analyzer-with-MCP-arch.png)
 7 | 
 8 | ## 🔌 Model Context Protocol (MCP)
 9 | 
10 | As outlined by Anthropic:
11 | > MCP is an open protocol that standardizes how applications provide context to LLMs. Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect your devices to various peripherals and accessories, MCP provides a standardized way to connect AI models to different data sources and tools.
12 | 
13 | This repository is an example client and server that allows an AI assistant like Claude to interact with CloudWatch logs in an AWS account. To learn more about MCP, read through the [introduction](https://modelcontextprotocol.io/introduction). 
14 | 
15 | ## ✨ Features
16 | 
17 | - Browse and search CloudWatch Log Groups
18 | - Search logs using CloudWatch Logs Insights query syntax
19 | - Generate log summaries and identify error patterns
20 | - Correlate logs across multiple AWS services
21 | - AI-optimized tools for assistants like Claude
22 | 
23 | [Detailed feature list](./docs/features.md)
24 | 
25 | ## 🚀 Installation
26 | 
27 | ### Prerequisites
28 | 
29 | - The [uv](https://github.com/astral-sh/uv) Python package and project manager
30 | - An AWS account with CloudWatch Logs
31 | - Configured [AWS credentials](./docs/aws-config.md)
32 | 
33 | 
34 | ### Setup
35 | 
36 | ```bash
37 | # Clone the repository
38 | git clone https://github.com/awslabs/Log-Analyzer-with-MCP.git
39 | cd Log-Analyzer-with-MCP
40 | 
41 | # Create a virtual environment and install dependencies
42 | uv sync
43 | source .venv/bin/activate  # On Windows, use `.venv\Scripts\activate`
44 | ```
45 | 
46 | ## 🚦 Quick Start
47 | 
48 | 1. Make sure to have configured your AWS credentials as [described here](./docs/aws-config.md)
49 | 
50 | 2. Update your `claude_desktop_config.json` file with the proper configuration outlined in the [AI integration guide](./docs/ai-integration.md)
51 | 
52 | 3. Open Claude for Desktop and start chatting!
53 | 
54 | For more examples and advanced usage, see the [detailed usage guide](./docs/usage.md).
55 | 
56 | ## 🤖 AI Integration
57 | 
58 | This project can be easily integrated with AI assistants like Claude for Desktop. See the [AI integration guide](./docs/ai-integration.md) for details.
59 | 
60 | ## 📚 Documentation
61 | 
62 | - [Detailed Features](./docs/features.md)
63 | - [Usage Guide](./docs/usage.md)
64 | - [AWS Configuration](./docs/aws-config.md)
65 | - [Architecture Details](./docs/architecture.md)
66 | - [AI Integration](./docs/ai-integration.md)
67 | - [Troubleshooting](./docs/troubleshooting.md)
68 | 
69 | ## 🔒 Security
70 | 
71 | See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.
72 | 
73 | ## 📄 License
74 | 
75 | This project is licensed under the Apache-2.0 License.
```

--------------------------------------------------------------------------------
/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------

```markdown
1 | ## Code of Conduct
2 | This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
3 | For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
4 | [email protected] with any additional questions or comments.
5 | 
```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Contributing Guidelines
 2 | 
 3 | Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional
 4 | documentation, we greatly value feedback and contributions from our community.
 5 | 
 6 | Please read through this document before submitting any issues or pull requests to ensure we have all the necessary
 7 | information to effectively respond to your bug report or contribution.
 8 | 
 9 | 
10 | ## Reporting Bugs/Feature Requests
11 | 
12 | We welcome you to use the GitHub issue tracker to report bugs or suggest features.
13 | 
14 | When filing an issue, please check existing open, or recently closed, issues to make sure somebody else hasn't already
15 | reported the issue. Please try to include as much information as you can. Details like these are incredibly useful:
16 | 
17 | * A reproducible test case or series of steps
18 | * The version of our code being used
19 | * Any modifications you've made relevant to the bug
20 | * Anything unusual about your environment or deployment
21 | 
22 | 
23 | ## Contributing via Pull Requests
24 | Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that:
25 | 
26 | 1. You are working against the latest source on the *main* branch.
27 | 2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already.
28 | 3. You open an issue to discuss any significant work - we would hate for your time to be wasted.
29 | 
30 | To send us a pull request, please:
31 | 
32 | 1. Fork the repository.
33 | 2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change.
34 | 3. Ensure local tests pass.
35 | 4. Commit to your fork using clear commit messages.
36 | 5. Send us a pull request, answering any default questions in the pull request interface.
37 | 6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.
38 | 
39 | GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and
40 | [creating a pull request](https://help.github.com/articles/creating-a-pull-request/).
41 | 
42 | 
43 | ## Finding contributions to work on
44 | Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels (enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any 'help wanted' issues is a great place to start.
45 | 
46 | 
47 | ## Code of Conduct
48 | This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
49 | For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
50 | [email protected] with any additional questions or comments.
51 | 
52 | 
53 | ## Security issue notifications
54 | If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue.
55 | 
56 | 
57 | ## Licensing
58 | 
59 | See the [LICENSE](LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution.
60 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/resources/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "Log-Analyzer-with-MCP"
 3 | version = "0.1.0"
 4 | description = "An MCP server that provides AI assistants access to CloudWatch Logs"
 5 | readme = "README.md"
 6 | requires-python = ">=3.12"
 7 | authors = [
 8 |   {name = "Aditya Addepalli", email = "[email protected]"},
 9 | ]
10 | dependencies = [
11 |     "boto3>=1.37.11",
12 |     "mcp[cli]>=1.6.0",
13 | ]
14 | 
15 | [dependency-groups]
16 | dev = [
17 |     "ruff>=0.11.10",
18 | ]
19 | 
```

--------------------------------------------------------------------------------
/.github/workflows/pre-commit.yml:
--------------------------------------------------------------------------------

```yaml
 1 | name: pre-commit
 2 | 
 3 | on:
 4 |   pull_request:
 5 |   push:
 6 |     branches: [main]
 7 | permissions: {}
 8 | jobs:
 9 |   pre-commmit:
10 |     runs-on: ubuntu-latest
11 |     steps:
12 |       - uses: actions/checkout@v4
13 |       - uses: actions/setup-python@v5
14 |         with:
15 |           python-version: "3.12"
16 |           cache: "pip"
17 |       - name: pre-commit setup
18 |         run: pip install pre-commit
19 |       - name: run linter
20 |         run: pre-commit run --all-files
21 | 
```

--------------------------------------------------------------------------------
/docs/features.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Detailed Features
 2 | 
 3 | ## 🔍 Log Group Discovery
 4 | - Browse available CloudWatch Log Groups
 5 | - Filter log groups by name prefix
 6 | - Paginate through large sets of log groups
 7 | 
 8 | ## 🔎 Search Capabilities
 9 | - Search for specific error patterns or anomalies in CloudWatch Logs
10 | - Apply CloudWatch Logs Insights query syntax for powerful filtering
11 | - Limit results to specific time ranges
12 | 
13 | ## 📊 Log Analysis
14 | - Generate summaries of log activity over time
15 | - Identify common error patterns and exceptions
16 | - Analyze log structure and format automatically
17 | - View frequency of different error types
18 | 
19 | ## 🔄 Cross-Service Correlation
20 | - Correlate logs across multiple AWS services
21 | - Track request flows through distributed systems
22 | - Identify related events across different log groups
23 | 
24 | ## 🤖 AI-Optimized Tools
25 | - Direct tool access for AI assistants
26 | - Specialized prompts for guided log analysis
27 | - Structured data responses for easy consumption 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 2 | # SPDX-License-Identifier: Apache-2.0
 3 | 
 4 | import functools
 5 | import json
 6 | import traceback
 7 | from typing import Callable
 8 | 
 9 | 
10 | def handle_exceptions(func: Callable) -> Callable:
11 |     """
12 |     Decorator for handling exceptions in tool methods.
13 |     Ensures that all tool methods return a standardized error response
14 |     rather than raising exceptions that would cause the client to fail.
15 |     """
16 | 
17 |     @functools.wraps(func)
18 |     async def wrapper(*args, **kwargs):
19 |         try:
20 |             return await func(*args, **kwargs)
21 |         except Exception as e:
22 |             error_traceback = traceback.format_exc()
23 |             error_response = {
24 |                 "status": "Error",
25 |                 "error": str(e),
26 |                 "error_type": e.__class__.__name__,
27 |                 "details": error_traceback.split("\n")[-2] if error_traceback else None,
28 |             }
29 |             return json.dumps(error_response, indent=2)
30 | 
31 |     return wrapper
32 | 
```

--------------------------------------------------------------------------------
/.github/pull_request_template.md:
--------------------------------------------------------------------------------

```markdown
 1 | <!-- markdownlint-disable MD041 MD043 -->
 2 | **Issue number:**
 3 | 
 4 | ## Summary
 5 | 
 6 | ### Changes
 7 | 
 8 | > Please provide a summary of what's being changed
 9 | 
10 | ### User experience
11 | 
12 | > Please share what the user experience looks like before and after this change
13 | 
14 | ## Checklist
15 | 
16 | If your change doesn't seem to apply, please leave them unchecked.
17 | 
18 | * [ ] I have reviewed the [contributing guidelines](https://github.com/awslabs/Log-Analyzer-with-MCP/blob/main/CONTRIBUTING.md)
19 | * [ ] I have performed a self-review of this change
20 | * [ ] Changes have been tested
21 | * [ ] Changes are documented
22 | 
23 | <details>
24 | <summary>Is this a breaking change?</summary>
25 | 
26 | **RFC issue number**:
27 | 
28 | Checklist:
29 | 
30 | * [ ] Migration process documented
31 | * [ ] Implement warnings (if it can live side by side)
32 | 
33 | </details>
34 | 
35 | ## Acknowledgment
36 | 
37 | By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of the [project license](https://github.com/awslabs/Log-Analyzer-with-MCP/blob/main/LICENSE).
38 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/utils.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | 
 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 4 | # SPDX-License-Identifier: Apache-2.0
 5 | 
 6 | from datetime import datetime, timedelta
 7 | import dateutil.parser
 8 | 
 9 | 
10 | def get_time_range(hours: int, start_time: str = None, end_time: str = None):
11 |     """
12 |     Calculate time range timestamps from hours or exact start/end times.
13 | 
14 |     Args:
15 |         hours: Number of hours to look back (used if start_time is not provided)
16 |         start_time: Optional ISO8601 start time
17 |         end_time: Optional ISO8601 end time
18 | 
19 |     Returns:
20 |         Tuple of (start_timestamp, end_timestamp) in milliseconds since epoch
21 |     """
22 |     if start_time:
23 |         start_ts = int(dateutil.parser.isoparse(start_time).timestamp() * 1000)
24 |     else:
25 |         start_ts = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
26 | 
27 |     if end_time:
28 |         end_ts = int(dateutil.parser.isoparse(end_time).timestamp() * 1000)
29 |     else:
30 |         end_ts = int(datetime.now().timestamp() * 1000)
31 | 
32 |     return start_ts, end_ts
33 | 
```

--------------------------------------------------------------------------------
/docs/architecture.md:
--------------------------------------------------------------------------------

```markdown
 1 | # 🏗️ Architecture Details
 2 | 
 3 | The following diagram illustrates the high-level architecture of the MCP CloudWatch Log Analyzer:
 4 | 
 5 | ![Architecture Diagram](../docs/assets/Log-Analyzer-with-MCP-arch.png)
 6 | 
 7 | The architecture consists of three main components:
 8 | 
 9 | ## 💻 Client Side
10 | - AWS Credentials are configured on the client machine
11 | - The local computer runs the MCP client applications
12 | - MCP Server runs locally and manages the communication
13 | 
14 | ## ☁️ AWS Cloud
15 | - CloudWatch service provides the log data and search capabilities
16 | 
17 | ## 🔄 Data Flow
18 | - AWS Credentials flow from configuration to the client
19 | - The client communicates with CloudWatch through the MCP Server
20 | - The MCP Server mediates all interactions with AWS services
21 | 
22 | The project follows the Model Context Protocol architecture:
23 | 
24 | ## 📚 Resources
25 | Expose CloudWatch log groups, streams, and events as addressable URIs
26 | - `logs://groups` - List all log groups
27 | - `logs://groups/{log_group_name}/streams` - List streams for a specific group
28 | - `logs://groups/{log_group_name}/streams/{log_stream_name}` - Get events from a stream
29 | 
30 | ## 🧰 Tools
31 | Provide functionality for log analysis, search, and correlation
32 | - `list_log_groups` - List and filter available log groups
33 | - `search_logs` - Search logs with CloudWatch Insights queries
34 | - `summarize_log_activity` - Generate time-based activity summaries
35 | - `find_error_patterns` - Identify common error patterns
36 | - `correlate_logs` - Find related events across multiple log groups
37 | 
38 | ## 💬 Prompts
39 | Guide AI assistants through common workflows
40 | - `list_cloudwatch_log_groups` - Help explore available log groups
41 | - `analyze_cloudwatch_logs` - Guide log analysis process
42 | 
43 | ## 🖥️ Server
44 | Handles MCP protocol communication and AWS API integration
45 | - Manages API access to CloudWatch Logs
46 | - Handles asynchronous CloudWatch Logs Insights queries
47 | - Provides structured data responses
48 | 
49 | ## 📱 Client
50 | Command-line interface for interacting with the server
51 | - Parses commands and arguments
52 | - Connects to the server via stdio
53 | - Formats JSON responses for human readability 
```

--------------------------------------------------------------------------------
/docs/aws-config.md:
--------------------------------------------------------------------------------

```markdown
 1 | # 🔐 AWS Configuration Guide
 2 | 
 3 | For the MCP server to access your AWS CloudWatch Logs, you need to configure AWS credentials, which you can learn how to do [here](https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-files.html). The server uses boto3's credential resolution chain, which checks several locations in the following order:
 4 | 
 5 | 1. **Environment variables**:
 6 |    ```bash
 7 |    export AWS_ACCESS_KEY_ID="your-access-key"
 8 |    export AWS_SECRET_ACCESS_KEY="your-secret-key"
 9 |    export AWS_REGION="us-east-1"
10 |    ```
11 | 
12 | 2. **Shared credential file** (`~/.aws/credentials`):
13 |    ```ini
14 |    [default]
15 |    aws_access_key_id = your-access-key
16 |    aws_secret_access_key = your-secret-key
17 |    ```
18 |    
19 |    If you're seeing errors like `An error occurred (AccessDenied) when calling the DescribeLogGroups operation: Access denied`, make sure to add your credentials in this format:
20 |    ```ini
21 |    [default]
22 |    aws_access_key_id = your-access-key
23 |    aws_secret_access_key = your-secret-key
24 |    
25 |    # For temporary credentials, add the session token
26 |    [temp-profile]
27 |    aws_access_key_id = your-temp-access-key
28 |    aws_secret_access_key = your-temp-secret-key
29 |    aws_session_token = your-session-token
30 |    ```
31 | 
32 |    Check out the [troubleshooting guide](./troubleshooting.md) for more information.
33 | 
34 | 3. **AWS config file** (`~/.aws/config`):
35 |    ```ini
36 |    [default]
37 |    region = us-east-1
38 |    ```
39 | 
40 | You can set up your AWS credentials using the AWS CLI:
41 | 
42 | ```bash
43 | aws configure
44 | ```
45 | 
46 | ## Using a Specific AWS Profile or Region
47 | 
48 | 1. **Server Start-up**
49 | 
50 |    If you have multiple AWS profiles or want to specify a region, use:
51 |    
52 |    ```bash
53 |    python src/cw-mcp-server/server.py --profile your-profile-name --region us-west-2
54 |    ```
55 | 
56 | 2. **Per-Call Override**
57 | 
58 |    Override the profile or region on individual AI prompts or tool calls:
59 |    
60 |    > Example: Get a list of CloudWatch log groups using the "dev-account" profile in "eu-central-1" region.
61 | 
62 |    Once you set a profile or region, the LLM keeps using it for follow-ups. Only specify a new profile or region when you need to switch accounts or regions.
63 | 
64 | This is useful when you need to access CloudWatch logs in different AWS accounts or regions.
65 | 
66 | ## 🛡️ Required Permissions
67 | 
68 | The MCP server requires permissions to access CloudWatch Logs. At minimum, ensure your IAM user or role has the following policies:
69 | - `CloudWatchLogsReadOnlyAccess`
70 | 
```

--------------------------------------------------------------------------------
/docs/troubleshooting.md:
--------------------------------------------------------------------------------

```markdown
 1 | # 🔧 Troubleshooting Guide
 2 | 
 3 | There may be various issues you can run into while setting this up. Here are some tips on troubleshooting:
 4 | 
 5 | ## ⚠️ Common Issues
 6 | 
 7 | **Server Disconnected**:
 8 | ```
 9 | MCP cw-mcp-server: Server Disconnected.
10 | ```
11 | 1. Ensure your json file in the [ai integration guide](./ai-integration.md) is configured properly.
12 | 2. Ensure you've set up your AWS credentials properly according to the [aws configuration](./aws-config.md)
13 | 
14 | **Authentication Errors**:
15 | ```
16 | Error: An error occurred (AccessDenied) when calling the DescribeLogGroups operation: Access denied
17 | ```
18 | Ensure your AWS credentials are properly configured and have the necessary permissions to access CloudWatch Logs:
19 | 
20 | 1. Check if your credentials file exists:
21 |    ```bash
22 |    cat ~/.aws/credentials
23 |    ```
24 | 
25 | 2. Verify you have the required permissions (CloudWatchLogsReadOnlyAccess) for the assumed role you are using.
26 | 
27 | 3. If using temporary credentials, ensure your session token is included in your `~/.aws/credentials` file:
28 |    ```ini
29 |    [profile-name]
30 |    aws_access_key_id = your-temp-access-key
31 |    aws_secret_access_key = your-temp-secret-key
32 |    aws_session_token = your-session-token
33 |    ```
34 | 
35 | 4. Test your credentials directly with AWS CLI:
36 |    ```bash
37 |    aws cloudwatch list-metrics --namespace AWS/Logs
38 |    ```
39 | 
40 | **Resource Not Found**:
41 | ```
42 | Error: An error occurred (ResourceNotFoundException) when calling the GetLogEvents operation
43 | ```
44 | Check that the log group and stream names are correct. Log stream names are case sensitive.
45 | 
46 | **Connection Issues**:
47 | ```
48 | Error: Failed to connect to MCP server
49 | ```
50 | Verify that the server is running and accessible. Check file paths in your `claude_desktop_config.json` or client configuration.
51 | 
52 | **Query Timeout**:
53 | ```
54 | "status": "Timeout", "error": "Search query failed to complete"
55 | ```
56 | For complex queries or large log volumes, try reducing the time range using the `--hours` parameter.
57 | 
58 | **Claude terminating request**:
59 | 
60 | This could be due to a query timeout or invalid response from CloudWatch (for example: performing operations on a non-existent log group). 
61 | 
62 | In this case, try checking the server logs as outlined in the settings
63 | 
64 | ![Claude Desktop Settings](./assets/claude-desktop-settings.png)
65 | 
66 | then, click on the `Open Logs Folder` and open the `mcp-server-cw-mcp-server.log` file to see more details. 
67 | 
68 | **Amazon Q CLI terminating request**:
69 | 
70 | if you have issues with your configuration, then Amazon Q CLI will start (without any MCP Server Tools) with an error similar to:
71 | ```
72 | WARNING: Error reading global mcp config: expected value at line 9 column 19
73 | Please check to make sure config is correct. Discarding.
74 | ```
75 | You might also see timeout issues if it is struggling to find and download the details you have configured in your mcp.json, for example:
76 | ```
77 | x mcp_server has failed to load:
78 | - Operation timed out: recv for initialization
79 | - run with Q_LOG_LEVEL=trace and see $TMPDIR/qlog for detail
80 | x 0 of 1 mcp servers initialized
81 | ```
82 | You can go to `$TMPDIR/qlog` to find various logs generated, and you can configure Q_LOG_LEVEL with either trace, debug, info, and warn configurations to get debug output to help you troubleshoot any issues you might run into.
83 | 
84 | 
85 | ## 🆘 Getting Help
86 | 
87 | If you encounter issues not covered in this troubleshooting section, please:
88 | 
89 | 1. Check the server logs for detailed error messages
90 | 2. Verify your AWS permissions and configuration
91 | 
92 | If you're still facing issues, please open a GitHub Issue.
93 | 
```

--------------------------------------------------------------------------------
/docs/ai-integration.md:
--------------------------------------------------------------------------------

```markdown
  1 | # AI Integration Guide
  2 | 
  3 | ## 🖥️ Claude Desktop Integration
  4 | 
  5 | You can add the configuration for the MCP server in Claude for Desktop for AI-assisted log analysis.
  6 | 
  7 | To get Claude for Desktop and how to add an MCP server, access [this link](https://modelcontextprotocol.io/quickstart/user). Add this to your respective json file:
  8 | 
  9 | ```json
 10 | {
 11 |   "mcpServers": {
 12 |     "cw-mcp-server": {
 13 |       "command": "uv",
 14 |       "args": [
 15 |         "--directory",
 16 |         "/path/to/Log-Analyzer-with-MCP/src/cw-mcp-server",
 17 |         "run",
 18 |         "server.py"
 19 |         // You can add "--profile", "your-profile" and/or "--region", "us-west-2" here if needed but it will pull it from your AWS credentials as well
 20 |       ]
 21 |     }
 22 |   },
 23 | }
 24 | ```
 25 | 
 26 | ## 🤖 Amazon Q CLI Integration
 27 | 
 28 | Amazon Q CLI acts as an MCP Client. To connect to MCP Servers and access the tools they surface, you need to create a configuration file called `mcp.json` in your Amazon Q configuration directory.
 29 | 
 30 | Your directory structure should look like this:
 31 | 
 32 | ```bash
 33 | ~/.aws
 34 | └── amazonq
 35 |     ├── mcp.json
 36 |     ├── profiles
 37 |     ├── cache
 38 |     ├── history
 39 |     └── prompts
 40 | ```
 41 | 
 42 | If `mcp.json` is empty, edit it to add this to your MCP Server configuration file:
 43 | 
 44 | ```json
 45 | {
 46 |   "mcpServers": {
 47 |     "cw-mcp-server": {
 48 |       "command": "uv",
 49 |       "args": [
 50 |         "--directory",
 51 |         "/path/to/Log-Analyzer-with-MCP/src/cw-mcp-server",
 52 |         "run",
 53 |         "server.py"
 54 |         // Optionally add "--profile", "your-profile" and/or "--region", "us-west-2" here if needed but it will pull it from your AWS credentials as well
 55 |       ]
 56 |     }
 57 |   }
 58 | }
 59 | ```
 60 | 
 61 | ### Testing the configuration
 62 | Every time you start Amazon Q CLI, it will attempt to load any configured MCP Servers. You should see output indicating that the MCP Server has been discovered and initialized.
 63 | 
 64 | ![image](https://github.com/user-attachments/assets/9acc1632-5a9a-4465-9fdc-a8464640f6a6)
 65 | 
 66 | If you're running into issues, check out the [troubleshooting guide](./troubleshooting.md) or open a GitHub Issue. 
 67 | 
 68 | ## 🔍 AI Assistant Capabilities
 69 | 
 70 | With the enhanced tool support, AI assistants can now:
 71 | 
 72 | 1. **Discover Log Groups**:
 73 |    - "Show me all my CloudWatch log groups"
 74 |    - "List log groups that start with /aws/lambda"
 75 |    - "Show me the next page of log groups"
 76 | 
 77 | 2. **Understand Log Structure**:
 78 |    - "Analyze the structure of my API Gateway logs"
 79 |    - "What fields are common in these JSON logs?"
 80 |    - "Show me a sample of recent logs from this group"
 81 | 
 82 | 3. **Diagnose Issues**:
 83 |    - "Find all errors in my Lambda logs from the past 24 hours"
 84 |    - "What's the most common error pattern in this log group?"
 85 |    - "Show me logs around the time this service crashed"
 86 | 
 87 | 4. **Perform Analysis**:
 88 |    - "Compare log volumes between these three services"
 89 |    - "Find correlations between errors in my database and API logs"
 90 |    - "Analyze the trend of timeouts in my Lambda function"
 91 | 
 92 | > You can specify a different AWS profile or region in your prompt, e.g. "Show me all my CloudWatch log groups using <profile_name> profile in <region> region"
 93 | 
 94 | ## 💬 AI Prompt Templates
 95 | 
 96 | The server provides specialized prompts that AI assistants can use:
 97 | 
 98 | 1. **List and Explore Log Groups Prompt**:
 99 |    ```
100 |    I'll help you explore the CloudWatch log groups in your AWS environment.
101 |    First, I'll list the available log groups...
102 |    ```
103 | 
104 | 2. **Log Analysis Prompt**:
105 |    ```
106 |    Please analyze the following CloudWatch logs from the {log_group_name} log group.
107 |    First, I'll get you some information about the log group...
108 |    ```
109 | 3. **Profile/Region Override**:
110 |    ```
111 |    I'll help you list CloudWatch log groups using the <profile_name> profile in the <region> region. Let me do that for you:
112 |    ```
113 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/correlation_tools.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | 
  3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4 | # SPDX-License-Identifier: Apache-2.0
  5 | 
  6 | import asyncio
  7 | import boto3
  8 | import json
  9 | import time
 10 | from datetime import datetime
 11 | from typing import List
 12 | 
 13 | from . import handle_exceptions
 14 | from .utils import get_time_range
 15 | 
 16 | 
 17 | class CloudWatchLogsCorrelationTools:
 18 |     """Tools for correlating logs across multiple CloudWatch Log groups."""
 19 | 
 20 |     def __init__(self, profile_name=None, region_name=None):
 21 |         """Initialize the CloudWatch Logs client.
 22 | 
 23 |         Args:
 24 |             profile_name: Optional AWS profile name to use for credentials
 25 |             region_name: Optional AWS region name to use for API calls
 26 |         """
 27 |         # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
 28 |         self.profile_name = profile_name
 29 |         self.region_name = region_name
 30 |         session = boto3.Session(profile_name=profile_name, region_name=region_name)
 31 |         self.logs_client = session.client("logs")
 32 | 
 33 |     @handle_exceptions
 34 |     async def correlate_logs(
 35 |         self,
 36 |         log_group_names: List[str],
 37 |         search_term: str,
 38 |         hours: int = 24,
 39 |         start_time: str = None,
 40 |         end_time: str = None,
 41 |     ) -> str:
 42 |         """
 43 |         Correlate logs across multiple AWS services using a common search term.
 44 | 
 45 |         Args:
 46 |             log_group_names: List of log group names to search
 47 |             search_term: Term to search for in logs (request ID, transaction ID, etc.)
 48 |             hours: Number of hours to look back
 49 |             start_time: Start time in ISO8601 format
 50 |             end_time: End time in ISO8601 format
 51 | 
 52 |         Returns:
 53 |             JSON string with correlated events
 54 |         """
 55 |         start_ts, end_ts = get_time_range(hours, start_time, end_time)
 56 | 
 57 |         # Validate inputs
 58 |         if not log_group_names:
 59 |             return json.dumps(
 60 |                 {"status": "Error", "error": "No log groups specified"}, indent=2
 61 |             )
 62 | 
 63 |         if not search_term:
 64 |             return json.dumps(
 65 |                 {"status": "Error", "error": "No search term specified"}, indent=2
 66 |             )
 67 | 
 68 |         # Results dictionary
 69 |         results = {
 70 |             "timeRange": {
 71 |                 "start": datetime.fromtimestamp(start_ts / 1000).isoformat(),
 72 |                 "end": datetime.fromtimestamp(end_ts / 1000).isoformat(),
 73 |                 "hours": hours,
 74 |             },
 75 |             "searchTerm": search_term,
 76 |             "logGroups": {},
 77 |             "correlatedEvents": [],
 78 |         }
 79 | 
 80 |         # Get relevant logs from each group
 81 |         for log_group_name in log_group_names:
 82 |             # Use CloudWatch Logs Insights query
 83 |             query = f"""
 84 |             filter @message like "{search_term}"
 85 |             | sort @timestamp asc
 86 |             | limit 100
 87 |             """
 88 | 
 89 |             # Start the query
 90 |             query_start_time = time.time()
 91 |             start_query_response = self.logs_client.start_query(
 92 |                 logGroupName=log_group_name,
 93 |                 startTime=start_ts,
 94 |                 endTime=end_ts,
 95 |                 queryString=query,
 96 |             )
 97 | 
 98 |             query_id = start_query_response["queryId"]
 99 | 
100 |             # Poll for query results
101 |             response = None
102 |             while response is None or response["status"] == "Running":
103 |                 await asyncio.sleep(1)  # Wait before checking again
104 |                 response = self.logs_client.get_query_results(queryId=query_id)
105 | 
106 |                 # Avoid long-running queries
107 |                 if response["status"] == "Running":
108 |                     # Check if we've been running too long (60 seconds)
109 |                     if time.time() - query_start_time > 60:
110 |                         response = {"status": "Timeout", "results": []}
111 |                         break
112 | 
113 |             # Process results for this log group
114 |             log_group_events = []
115 | 
116 |             for result in response.get("results", []):
117 |                 event = {"logGroup": log_group_name, "timestamp": None, "message": None}
118 | 
119 |                 for field in result:
120 |                     if field["field"] == "@timestamp":
121 |                         event["timestamp"] = field["value"]
122 |                     elif field["field"] == "@message":
123 |                         event["message"] = field["value"]
124 |                     elif field["field"] == "@logStream":
125 |                         event["logStream"] = field["value"]
126 | 
127 |                 if event["timestamp"] and event["message"]:
128 |                     log_group_events.append(event)
129 |                     results["correlatedEvents"].append(event)
130 | 
131 |             # Store events for this log group
132 |             results["logGroups"][log_group_name] = {
133 |                 "eventCount": len(log_group_events),
134 |                 "events": log_group_events,
135 |             }
136 | 
137 |         # Sort all correlated events by timestamp
138 |         results["correlatedEvents"] = sorted(
139 |             results["correlatedEvents"], key=lambda x: x.get("timestamp", "")
140 |         )
141 | 
142 |         return json.dumps(results, indent=2)
143 | 
```

--------------------------------------------------------------------------------
/docs/usage.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Detailed Usage Guide
  2 | 
  3 | ## 🌐 Integrated with MCP clients (Claude for Desktop, Cursor, Windsurf, etc.) - recommended way for usage
  4 | 
  5 | AI assistants can leverage this MCP server. To understand more check out the [AI Integration Guide](./ai-integration.md)
  6 | 
  7 | ## 🖥️ Standalone Server directly
  8 | 
  9 | The MCP server exposes CloudWatch logs data and analysis tools to AI assistants and MCP clients:
 10 | 
 11 | ```bash
 12 | python src/cw-mcp-server/server.py [--profile your-profile] [--region us-west-2]
 13 | ```
 14 | 
 15 | The server runs in the foreground by default. To run it in the background, you can use:
 16 | 
 17 | ```bash
 18 | python src/cw-mcp-server/server.py &
 19 | ```
 20 | 
 21 | [Amazon Bedrock AgentCore requires stateless streamable-HTTP servers](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-mcp.html#runtime-mcp-how-it-works) because the Runtime provides session isolation by default. The platform automatically adds a `Mcp-Session-Id` header for any request without it, so MCP clients can maintain connection continuity to the same Amazon Bedrock AgentCore Runtime session.
 22 | 
 23 | The server runs in stateful mode by default. To run it in stateless mode, you can use:
 24 | 
 25 | ```bash
 26 | python src/cw-mcp-server/server.py [--profile your-profile] [--region us-west-2] --stateless
 27 | ```
 28 | 
 29 | ## 📟 CLI Client (one off usage)
 30 | 
 31 | The project includes a command-line client for interacting with the MCP server:
 32 | 
 33 | ```bash
 34 | # List available log groups
 35 | python src/client.py list-groups [--profile your-profile] [--region us-west-2]
 36 | 
 37 | # List log groups with a prefix filter
 38 | python src/client.py list-groups --prefix "/aws/lambda" [--region us-west-2]
 39 | 
 40 | # Use the tool interface instead of resource
 41 | python src/client.py list-groups --use-tool [--region us-west-2]
 42 | 
 43 | # Get a prompt for exploring log groups
 44 | python src/client.py list-prompt [--region us-west-2]
 45 | 
 46 | # List log streams in a specific log group
 47 | python src/client.py list-streams "/aws/lambda/my-function" [--region us-west-2]
 48 | 
 49 | # Get log events from a specific stream
 50 | python src/client.py get-events "/aws/lambda/my-function" "2023/06/01/[$LATEST]abcdef123456" [--region us-west-2]
 51 | 
 52 | # Get a sample of recent logs
 53 | python src/client.py sample "/aws/lambda/my-function" [--region us-west-2]
 54 | 
 55 | # Get recent errors
 56 | python src/client.py recent-errors "/aws/lambda/my-function" [--region us-west-2]
 57 | 
 58 | # Get log structure analysis
 59 | python src/client.py structure "/aws/lambda/my-function" [--region us-west-2]
 60 | 
 61 | # Search logs for a specific pattern
 62 | python src/client.py search "/aws/lambda/my-function" "filter @message like 'error'" [--region us-west-2]
 63 | 
 64 | # Generate a summary of log activity
 65 | python src/client.py summarize "/aws/lambda/my-function" --hours 48 [--region us-west-2]
 66 | 
 67 | # Find common error patterns
 68 | python src/client.py find-errors "/aws/lambda/my-function" [--region us-west-2]
 69 | 
 70 | # Correlate logs across multiple services
 71 | python src/client.py correlate "/aws/lambda/service1" "/aws/lambda/service2" "OrderId: 12345" [--region us-west-2]
 72 | ```
 73 | 
 74 | *You can use --profile and --region with any command to target a specific AWS account or region.*
 75 | 
 76 | ## 🧩 Example Workflows
 77 | 
 78 | ### Finding and analyzing errors in a Lambda function using the standalone server directly
 79 | 
 80 | ```bash
 81 | # 1. List your log groups to find the Lambda function
 82 | python src/client.py list-groups --prefix "/aws/lambda" [--region us-west-2]
 83 | 
 84 | # 2. Generate a summary to see when errors occurred
 85 | python src/client.py summarize "/aws/lambda/my-function" --hours 24 [--region us-west-2]
 86 | 
 87 | # 3. Find the most common error patterns
 88 | python src/client.py find-errors "/aws/lambda/my-function" [--region us-west-2]
 89 | 
 90 | # 4. Search for details about a specific error
 91 | python src/client.py search "/aws/lambda/my-function" "filter @message like 'ConnectionError'" [--region us-west-2]
 92 | ```
 93 | 
 94 | ### Correlating requests across microservices using the standalone server directly
 95 | 
 96 | ```bash
 97 | # Track a request ID across multiple services
 98 | python src/client.py correlate \
 99 |   "/aws/lambda/api-gateway" \
100 |   "/aws/lambda/auth-service" \
101 |   "/aws/lambda/payment-processor" \
102 |   "req-abc123" [--region us-west-2]
103 | ```
104 | 
105 | ## 🔗 Resource URIs
106 | 
107 | The MCP server exposes CloudWatch Logs data through the following resource URIs:
108 | 
109 | | Resource URI | Description |
110 | |--------------|-------------|
111 | | `logs://groups` | List all log groups |
112 | | `logs://groups/filter/{prefix}` | List log groups filtered by prefix |
113 | | `logs://groups/{log_group_name}` | Get details about a specific log group |
114 | | `logs://groups/{log_group_name}/streams` | List streams for a log group |
115 | | `logs://groups/{log_group_name}/streams/{log_stream_name}` | Get events from a specific log stream |
116 | | `logs://groups/{log_group_name}/sample` | Get a sample of recent logs |
117 | | `logs://groups/{log_group_name}/recent-errors` | Get recent errors from a log group |
118 | | `logs://groups/{log_group_name}/metrics` | Get log metrics (volume, frequency) |
119 | | `logs://groups/{log_group_name}/structure` | Analyze log format and structure |
120 | 
121 | ## 🧰 Tool Handlers
122 | 
123 | The server provides the following tool handlers for AI assistants:
124 | 
125 | | Tool | Description |
126 | |------|-------------|
127 | | `list_log_groups` | List available CloudWatch log groups with filtering options |
128 | | `search_logs` | Execute CloudWatch Logs Insights queries on a single log group |
129 | | `search_logs_multi` | Execute CloudWatch Logs Insights queries across multiple log groups |
130 | | `filter_log_events` | Filter logs by pattern across all streams |
131 | | `summarize_log_activity` | Generate time-based activity summaries |
132 | | `find_error_patterns` | Identify common error patterns |
133 | | `correlate_logs` | Find related events across multiple log groups | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/search_tools.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | 
  3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4 | # SPDX-License-Identifier: Apache-2.0
  5 | 
  6 | import asyncio
  7 | import boto3
  8 | import json
  9 | import time
 10 | from datetime import datetime
 11 | from typing import List
 12 | 
 13 | from . import handle_exceptions
 14 | from .utils import get_time_range
 15 | 
 16 | 
 17 | class CloudWatchLogsSearchTools:
 18 |     """Tools for searching and querying CloudWatch Logs."""
 19 | 
 20 |     def __init__(self, profile_name=None, region_name=None):
 21 |         """Initialize the CloudWatch Logs client.
 22 | 
 23 |         Args:
 24 |             profile_name: Optional AWS profile name to use for credentials
 25 |             region_name: Optional AWS region name to use for API calls
 26 |         """
 27 |         # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
 28 |         self.profile_name = profile_name
 29 |         self.region_name = region_name
 30 |         session = boto3.Session(profile_name=profile_name, region_name=region_name)
 31 |         self.logs_client = session.client("logs")
 32 | 
 33 |     @handle_exceptions
 34 |     async def search_logs(
 35 |         self,
 36 |         log_group_name: str,
 37 |         query: str,
 38 |         hours: int = 24,
 39 |         start_time: str = None,
 40 |         end_time: str = None,
 41 |     ) -> str:
 42 |         """
 43 |         Search logs using CloudWatch Logs Insights query.
 44 | 
 45 |         Args:
 46 |             log_group_name: The log group to search
 47 |             query: CloudWatch Logs Insights query syntax
 48 |             hours: Number of hours to look back
 49 |             start_time: Start time in ISO8601 format
 50 |             end_time: End time in ISO8601 format
 51 | 
 52 |         Returns:
 53 |             JSON string with search results
 54 |         """
 55 |         return await self.search_logs_multi(
 56 |             [log_group_name], query, hours, start_time, end_time
 57 |         )
 58 | 
 59 |     @handle_exceptions
 60 |     async def search_logs_multi(
 61 |         self,
 62 |         log_group_names: List[str],
 63 |         query: str,
 64 |         hours: int = 24,
 65 |         start_time: str = None,
 66 |         end_time: str = None,
 67 |     ) -> str:
 68 |         """
 69 |         Search logs across multiple log groups using CloudWatch Logs Insights query.
 70 | 
 71 |         Args:
 72 |             log_group_names: List of log groups to search
 73 |             query: CloudWatch Logs Insights query syntax
 74 |             hours: Number of hours to look back
 75 |             start_time: Start time in ISO8601 format
 76 |             end_time: End time in ISO8601 format
 77 | 
 78 |         Returns:
 79 |             JSON string with search results
 80 |         """
 81 |         start_ts, end_ts = get_time_range(hours, start_time, end_time)
 82 |         # Start the query
 83 |         query_start_time = time.time()
 84 |         start_query_response = self.logs_client.start_query(
 85 |             logGroupNames=log_group_names,
 86 |             startTime=start_ts,
 87 |             endTime=end_ts,
 88 |             queryString=query,
 89 |             limit=100,
 90 |         )
 91 |         query_id = start_query_response["queryId"]
 92 | 
 93 |         # Poll for query results
 94 |         response = None
 95 |         while response is None or response["status"] == "Running":
 96 |             await asyncio.sleep(1)  # Wait before checking again
 97 |             response = self.logs_client.get_query_results(queryId=query_id)
 98 |             elapsed_time = time.time() - query_start_time
 99 | 
100 |             # Avoid long-running queries
101 |             if response["status"] == "Running":
102 |                 # Check if we've been running too long (60 seconds)
103 |                 if elapsed_time > 60:
104 |                     return json.dumps(
105 |                         {
106 |                             "status": "Timeout",
107 |                             "error": "Search query failed to complete within time limit",
108 |                         },
109 |                         indent=2,
110 |                     )
111 | 
112 |         # Process and format the results
113 |         formatted_results = {
114 |             "status": response["status"],
115 |             "statistics": response.get("statistics", {}),
116 |             "searchedLogGroups": log_group_names,
117 |             "results": [],
118 |         }
119 | 
120 |         for result in response.get("results", []):
121 |             result_dict = {}
122 |             for field in result:
123 |                 result_dict[field["field"]] = field["value"]
124 |             formatted_results["results"].append(result_dict)
125 | 
126 |         return json.dumps(formatted_results, indent=2)
127 | 
128 |     @handle_exceptions
129 |     async def filter_log_events(
130 |         self,
131 |         log_group_name: str,
132 |         filter_pattern: str,
133 |         hours: int = 24,
134 |         start_time: str = None,
135 |         end_time: str = None,
136 |     ) -> str:
137 |         """
138 |         Filter log events by pattern across all streams in a log group.
139 | 
140 |         Args:
141 |             log_group_name: The log group to filter
142 |             filter_pattern: The pattern to search for (CloudWatch Logs filter syntax)
143 |             hours: Number of hours to look back
144 |             start_time: Start time in ISO8601 format
145 |             end_time: End time in ISO8601 format
146 | 
147 |         Returns:
148 |             JSON string with filtered events
149 |         """
150 |         start_ts, end_ts = get_time_range(hours, start_time, end_time)
151 |         response = self.logs_client.filter_log_events(
152 |             logGroupName=log_group_name,
153 |             filterPattern=filter_pattern,
154 |             startTime=start_ts,
155 |             endTime=end_ts,
156 |             limit=100,
157 |         )
158 | 
159 |         events = response.get("events", [])
160 |         formatted_events = []
161 | 
162 |         for event in events:
163 |             formatted_events.append(
164 |                 {
165 |                     "timestamp": datetime.fromtimestamp(
166 |                         event.get("timestamp", 0) / 1000
167 |                     ).isoformat(),
168 |                     "message": event.get("message"),
169 |                     "logStreamName": event.get("logStreamName"),
170 |                 }
171 |             )
172 | 
173 |         return json.dumps(formatted_events, indent=2)
174 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/analysis_tools.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | 
  3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4 | # SPDX-License-Identifier: Apache-2.0
  5 | 
  6 | import asyncio
  7 | import boto3
  8 | import json
  9 | from datetime import datetime
 10 | 
 11 | from . import handle_exceptions
 12 | from .utils import get_time_range
 13 | 
 14 | 
 15 | class CloudWatchLogsAnalysisTools:
 16 |     """Tools for analyzing CloudWatch Logs data."""
 17 | 
 18 |     def __init__(self, profile_name=None, region_name=None):
 19 |         """Initialize the CloudWatch Logs client.
 20 | 
 21 |         Args:
 22 |             profile_name: Optional AWS profile name to use for credentials
 23 |             region_name: Optional AWS region name to use for API calls
 24 |         """
 25 |         # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
 26 |         self.profile_name = profile_name
 27 |         self.region_name = region_name
 28 |         session = boto3.Session(profile_name=profile_name, region_name=region_name)
 29 |         self.logs_client = session.client("logs")
 30 | 
 31 |     @handle_exceptions
 32 |     async def summarize_log_activity(
 33 |         self,
 34 |         log_group_name: str,
 35 |         hours: int = 24,
 36 |         start_time: str = None,
 37 |         end_time: str = None,
 38 |     ) -> str:
 39 |         """
 40 |         Generate a summary of log activity over a specified time period.
 41 | 
 42 |         Args:
 43 |             log_group_name: The log group to analyze
 44 |             hours: Number of hours to look back
 45 |             start_time: Start time in ISO8601 format
 46 |             end_time: End time in ISO8601 format
 47 | 
 48 |         Returns:
 49 |             JSON string with activity summary
 50 |         """
 51 |         start_ts, end_ts = get_time_range(hours, start_time, end_time)
 52 | 
 53 |         # Use CloudWatch Logs Insights to get a summary
 54 |         query = """
 55 |         stats count(*) as logEvents,
 56 |               count_distinct(stream) as streams
 57 |         | sort @timestamp desc
 58 |         | limit 1000
 59 |         """
 60 | 
 61 |         # Start the query
 62 |         start_query_response = self.logs_client.start_query(
 63 |             logGroupName=log_group_name,
 64 |             startTime=start_ts,
 65 |             endTime=end_ts,
 66 |             queryString=query,
 67 |         )
 68 | 
 69 |         query_id = start_query_response["queryId"]
 70 | 
 71 |         # Poll for query results
 72 |         response = None
 73 |         while response is None or response["status"] == "Running":
 74 |             await asyncio.sleep(1)  # Wait before checking again
 75 |             response = self.logs_client.get_query_results(queryId=query_id)
 76 | 
 77 |         # Get the hourly distribution
 78 |         hourly_query = """
 79 |         stats count(*) as count by bin(1h)
 80 |         | sort @timestamp desc
 81 |         | limit 24
 82 |         """
 83 | 
 84 |         # Start the hourly query
 85 |         hourly_query_response = self.logs_client.start_query(
 86 |             logGroupName=log_group_name,
 87 |             startTime=start_ts,
 88 |             endTime=end_ts,
 89 |             queryString=hourly_query,
 90 |         )
 91 | 
 92 |         hourly_query_id = hourly_query_response["queryId"]
 93 | 
 94 |         # Poll for hourly query results
 95 |         hourly_response = None
 96 |         while hourly_response is None or hourly_response["status"] == "Running":
 97 |             await asyncio.sleep(1)  # Wait before checking again
 98 |             hourly_response = self.logs_client.get_query_results(
 99 |                 queryId=hourly_query_id
100 |             )
101 | 
102 |         # Process the main summary results
103 |         summary = {
104 |             "timeRange": {
105 |                 "start": datetime.fromtimestamp(start_ts / 1000).isoformat(),
106 |                 "end": datetime.fromtimestamp(end_ts / 1000).isoformat(),
107 |                 "hours": hours,
108 |             },
109 |             "logEvents": 0,
110 |             "uniqueStreams": 0,
111 |             "hourlyDistribution": [],
112 |         }
113 | 
114 |         # Extract the main stats
115 |         for result in response.get("results", []):
116 |             for field in result:
117 |                 if field["field"] == "logEvents":
118 |                     summary["logEvents"] = int(field["value"])
119 |                 elif field["field"] == "streams":
120 |                     summary["uniqueStreams"] = int(field["value"])
121 | 
122 |         # Extract the hourly distribution
123 |         for result in hourly_response.get("results", []):
124 |             hour_data = {}
125 |             for field in result:
126 |                 if field["field"] == "bin(1h)":
127 |                     hour_data["hour"] = field["value"]
128 |                 elif field["field"] == "count":
129 |                     hour_data["count"] = int(field["value"])
130 | 
131 |             if hour_data:
132 |                 summary["hourlyDistribution"].append(hour_data)
133 | 
134 |         return json.dumps(summary, indent=2)
135 | 
136 |     @handle_exceptions
137 |     async def find_error_patterns(
138 |         self,
139 |         log_group_name: str,
140 |         hours: int = 24,
141 |         start_time: str = None,
142 |         end_time: str = None,
143 |     ) -> str:
144 |         """
145 |         Find common error patterns in logs.
146 | 
147 |         Args:
148 |             log_group_name: The log group to analyze
149 |             hours: Number of hours to look back
150 |             start_time: Start time in ISO8601 format
151 |             end_time: End time in ISO8601 format
152 | 
153 |         Returns:
154 |             JSON string with error patterns
155 |         """
156 |         start_ts, end_ts = get_time_range(hours, start_time, end_time)
157 | 
158 |         # Query for error logs
159 |         error_query = """
160 |         filter @message like /(?i)(error|exception|fail|traceback)/
161 |         | stats count(*) as errorCount by @message
162 |         | sort errorCount desc
163 |         | limit 20
164 |         """
165 | 
166 |         # Start the query
167 |         start_query_response = self.logs_client.start_query(
168 |             logGroupName=log_group_name,
169 |             startTime=start_ts,
170 |             endTime=end_ts,
171 |             queryString=error_query,
172 |         )
173 | 
174 |         query_id = start_query_response["queryId"]
175 | 
176 |         # Poll for query results
177 |         response = None
178 |         while response is None or response["status"] == "Running":
179 |             await asyncio.sleep(1)  # Wait before checking again
180 |             response = self.logs_client.get_query_results(queryId=query_id)
181 | 
182 |         # Process the results
183 |         error_patterns = {
184 |             "timeRange": {
185 |                 "start": datetime.fromtimestamp(start_ts / 1000).isoformat(),
186 |                 "end": datetime.fromtimestamp(end_ts / 1000).isoformat(),
187 |                 "hours": hours,
188 |             },
189 |             "errorPatterns": [],
190 |         }
191 | 
192 |         for result in response.get("results", []):
193 |             pattern = {}
194 |             for field in result:
195 |                 if field["field"] == "@message":
196 |                     pattern["message"] = field["value"]
197 |                 elif field["field"] == "errorCount":
198 |                     pattern["count"] = int(field["value"])
199 | 
200 |             if pattern:
201 |                 error_patterns["errorPatterns"].append(pattern)
202 | 
203 |         return json.dumps(error_patterns, indent=2)
204 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/server.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | 
  3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4 | # SPDX-License-Identifier: Apache-2.0
  5 | 
  6 | import sys
  7 | import os
  8 | import argparse
  9 | from typing import List, Callable, Any, Type, Optional
 10 | from functools import wraps
 11 | import asyncio
 12 | 
 13 | from mcp.server.fastmcp import FastMCP
 14 | from resources.cloudwatch_logs_resource import CloudWatchLogsResource
 15 | from tools.search_tools import CloudWatchLogsSearchTools
 16 | from tools.analysis_tools import CloudWatchLogsAnalysisTools
 17 | from tools.correlation_tools import CloudWatchLogsCorrelationTools
 18 | 
 19 | # Parse command line arguments
 20 | parser = argparse.ArgumentParser(description="CloudWatch Logs Analyzer MCP Server")
 21 | parser.add_argument(
 22 |     "--profile", type=str, help="AWS profile name to use for credentials"
 23 | )
 24 | parser.add_argument("--region", type=str, help="AWS region name to use for API calls")
 25 | parser.add_argument(
 26 |     "--stateless", action="store_true", help="Stateless HTTP mode", default=False
 27 | )
 28 | args, unknown = parser.parse_known_args()
 29 | 
 30 | # Add the current directory to the path so we can import our modules
 31 | current_dir = os.path.dirname(os.path.abspath(__file__))
 32 | sys.path.append(current_dir)
 33 | 
 34 | # Create the MCP server for CloudWatch logs
 35 | mcp = FastMCP("CloudWatch Logs Analyzer", stateless_http=args.stateless)
 36 | 
 37 | # Initialize our resource and tools classes with the specified AWS profile and region
 38 | cw_resource = CloudWatchLogsResource(profile_name=args.profile, region_name=args.region)
 39 | search_tools = CloudWatchLogsSearchTools(
 40 |     profile_name=args.profile, region_name=args.region
 41 | )
 42 | analysis_tools = CloudWatchLogsAnalysisTools(
 43 |     profile_name=args.profile, region_name=args.region
 44 | )
 45 | correlation_tools = CloudWatchLogsCorrelationTools(
 46 |     profile_name=args.profile, region_name=args.region
 47 | )
 48 | 
 49 | # Capture the parsed CLI profile and region in separate variables
 50 | default_profile = args.profile
 51 | default_region = args.region
 52 | 
 53 | 
 54 | # Helper decorator to handle profile and region parameters for tools
 55 | def with_aws_config(tool_class: Type, method_name: Optional[str] = None) -> Callable:
 56 |     """
 57 |     Decorator that handles the profile and region parameters for tool functions.
 58 |     Creates a new instance of the specified tool class with the correct profile and region.
 59 | 
 60 |     Args:
 61 |         tool_class: The class to instantiate with the profile and region
 62 |         method_name: Optional method name if different from the decorated function
 63 |     """
 64 | 
 65 |     def decorator(func: Callable) -> Callable:
 66 |         @wraps(func)
 67 |         async def wrapper(*args, **kwargs) -> Any:
 68 |             try:
 69 |                 profile = kwargs.pop("profile", None) or default_profile
 70 |                 region = kwargs.pop("region", None) or default_region
 71 |                 tool_instance = tool_class(profile_name=profile, region_name=region)
 72 |                 target_method = method_name or func.__name__
 73 |                 method = getattr(tool_instance, target_method)
 74 |                 result = method(**kwargs)
 75 |                 if asyncio.iscoroutine(result):
 76 |                     return await result
 77 |                 return result
 78 |             except AttributeError as e:
 79 |                 raise RuntimeError(
 80 |                     f"Method {target_method} not found in {tool_class.__name__}, {e}"
 81 |                 ) from e
 82 |             except Exception as e:
 83 |                 raise RuntimeError(
 84 |                     f"An error {e} occurred while executing {target_method} in {tool_class.__name__}"
 85 |                 ) from e
 86 | 
 87 |         return wrapper
 88 | 
 89 |     return decorator
 90 | 
 91 | 
 92 | # ==============================
 93 | # Resource Handlers
 94 | # ==============================
 95 | 
 96 | 
 97 | @mcp.resource("logs://groups")
 98 | def get_log_groups() -> str:
 99 |     """Get a list of all CloudWatch Log Groups"""
100 |     # Use default values for parameters
101 |     prefix = None
102 |     limit = 50
103 |     next_token = None
104 | 
105 |     return cw_resource.get_log_groups(prefix, limit, next_token)
106 | 
107 | 
108 | @mcp.resource("logs://groups/filter/{prefix}")
109 | def get_filtered_log_groups(prefix: str) -> str:
110 |     """
111 |     Get a filtered list of CloudWatch Log Groups by prefix
112 | 
113 |     Args:
114 |         prefix: The prefix to filter log groups by
115 |     """
116 |     # Default values for other parameters
117 |     limit = 50
118 |     next_token = None
119 | 
120 |     return cw_resource.get_log_groups(prefix, limit, next_token)
121 | 
122 | 
123 | @mcp.resource("logs://groups/{log_group_name}")
124 | def get_log_group_details(log_group_name: str) -> str:
125 |     """Get detailed information about a specific log group"""
126 |     return cw_resource.get_log_group_details(log_group_name)
127 | 
128 | 
129 | @mcp.resource("logs://groups/{log_group_name}/streams")
130 | def get_log_streams(log_group_name: str) -> str:
131 |     """
132 |     Get a list of log streams for a specific log group
133 | 
134 |     Args:
135 |         log_group_name: The name of the log group
136 |     """
137 |     # Use default limit value
138 |     limit = 20
139 |     return cw_resource.get_log_streams(log_group_name, limit)
140 | 
141 | 
142 | @mcp.resource("logs://groups/{log_group_name}/streams/{log_stream_name}")
143 | def get_log_events(log_group_name: str, log_stream_name: str) -> str:
144 |     """
145 |     Get log events from a specific log stream
146 | 
147 |     Args:
148 |         log_group_name: The name of the log group
149 |         log_stream_name: The name of the log stream
150 |     """
151 |     # Use default limit value
152 |     limit = 100
153 |     return cw_resource.get_log_events(log_group_name, log_stream_name, limit)
154 | 
155 | 
156 | @mcp.resource("logs://groups/{log_group_name}/sample")
157 | def get_log_sample(log_group_name: str) -> str:
158 |     """
159 |     Get a sample of recent logs from a log group
160 | 
161 |     Args:
162 |         log_group_name: The name of the log group
163 |     """
164 |     # Use default limit value
165 |     limit = 10
166 |     return cw_resource.get_log_sample(log_group_name, limit)
167 | 
168 | 
169 | @mcp.resource("logs://groups/{log_group_name}/recent-errors")
170 | def get_recent_errors(log_group_name: str) -> str:
171 |     """
172 |     Get recent error logs from a log group
173 | 
174 |     Args:
175 |         log_group_name: The name of the log group
176 |     """
177 |     # Use default hours value
178 |     hours = 24
179 |     return cw_resource.get_recent_errors(log_group_name, hours)
180 | 
181 | 
182 | @mcp.resource("logs://groups/{log_group_name}/metrics")
183 | def get_log_metrics(log_group_name: str) -> str:
184 |     """
185 |     Get log volume metrics for a log group
186 | 
187 |     Args:
188 |         log_group_name: The name of the log group
189 |     """
190 |     # Use default hours value
191 |     hours = 24
192 |     return cw_resource.get_log_metrics(log_group_name, hours)
193 | 
194 | 
195 | @mcp.resource("logs://groups/{log_group_name}/structure")
196 | def analyze_log_structure(log_group_name: str) -> str:
197 |     """Analyze and provide information about the structure of logs"""
198 |     return cw_resource.analyze_log_structure(log_group_name)
199 | 
200 | 
201 | # ==============================
202 | # Prompts
203 | # ==============================
204 | 
205 | 
206 | @mcp.prompt()
207 | def list_cloudwatch_log_groups(
208 |     prefix: str = None, profile: str = None, region: str = None
209 | ) -> str:
210 |     """
211 |     Prompt for listing and exploring CloudWatch log groups.
212 | 
213 |     Args:
214 |         prefix: Optional prefix to filter log groups by name
215 |         profile: Optional AWS profile name to use for credentials
216 |         region: Optional AWS region name to use for API calls
217 |     """
218 |     profile_text = f" using profile '{profile}'" if profile else ""
219 |     region_text = f" in region '{region}'" if region else ""
220 |     prefix_text = f" with prefix '{prefix}'" if prefix else ""
221 | 
222 |     return f"""I'll help you explore the CloudWatch log groups in your AWS environment{profile_text}{region_text}.
223 | 
224 | First, I'll list the available log groups{prefix_text}.
225 | 
226 | For each log group, I can help you:
227 | 1. Get detailed information about the group (retention, size, etc.)
228 | 2. Check for recent errors or patterns
229 | 3. View metrics like volume and activity
230 | 4. Sample recent logs to understand the content
231 | 5. Search for specific patterns or events
232 | 
233 | Let me know which log group you'd like to explore further, or if you'd like to refine the search with a different prefix.
234 | """
235 | 
236 | 
237 | @mcp.prompt()
238 | def analyze_cloudwatch_logs(
239 |     log_group_name: str, profile: str = None, region: str = None
240 | ) -> str:
241 |     """
242 |     Prompt for analyzing CloudWatch logs to help identify issues, patterns, and insights.
243 | 
244 |     Args:
245 |         log_group_name: The name of the log group to analyze
246 |         profile: Optional AWS profile name to use for credentials
247 |         region: Optional AWS region name to use for API calls
248 |     """
249 |     profile_text = f" using profile '{profile}'" if profile else ""
250 |     region_text = f" in region '{region}'" if region else ""
251 | 
252 |     return f"""Please analyze the following CloudWatch logs from the {log_group_name} log group{profile_text}{region_text}.
253 | 
254 | First, I'll get you some information about the log group:
255 | 1. Get the basic log group structure to understand the format of logs
256 | 2. Check for any recent errors
257 | 3. Examine the log volume metrics
258 | 4. Analyze a sample of recent logs
259 | 
260 | Based on this information, please:
261 | - Identify any recurring errors or exceptions
262 | - Look for unusual patterns or anomalies
263 | - Suggest possible root causes for any issues found
264 | - Recommend actions to resolve or mitigate problems
265 | - Provide insights on performance or resource utilization
266 | 
267 | Feel free to ask for additional context if needed, such as:
268 | - Correlation with logs from other services
269 | - More specific time ranges for analysis
270 | - Queries for specific error messages or events
271 | """
272 | 
273 | 
274 | # ==============================
275 | # Tool Handlers
276 | # ==============================
277 | 
278 | 
279 | @mcp.tool()
280 | @with_aws_config(CloudWatchLogsResource, method_name="get_log_groups")
281 | async def list_log_groups(
282 |     prefix: str = None,
283 |     limit: int = 50,
284 |     next_token: str = None,
285 |     profile: str = None,
286 |     region: str = None,
287 | ) -> str:
288 |     """
289 |     List available CloudWatch log groups with optional filtering by prefix.
290 | 
291 |     Args:
292 |         prefix: Optional prefix to filter log groups by name
293 |         limit: Maximum number of log groups to return (default: 50)
294 |         next_token: Token for pagination to get the next set of results
295 |         profile: Optional AWS profile name to use for credentials
296 |         region: Optional AWS region name to use for API calls
297 | 
298 |     Returns:
299 |         JSON string with log groups information
300 |     """
301 |     # Function body is handled by the decorator
302 |     pass
303 | 
304 | 
305 | @mcp.tool()
306 | @with_aws_config(CloudWatchLogsSearchTools)
307 | async def search_logs(
308 |     log_group_name: str,
309 |     query: str,
310 |     hours: int = 24,
311 |     start_time: str = None,
312 |     end_time: str = None,
313 |     profile: str = None,
314 |     region: str = None,
315 | ) -> str:
316 |     """
317 |     Search logs using CloudWatch Logs Insights query.
318 | 
319 |     Args:
320 |         log_group_name: The log group to search
321 |         query: CloudWatch Logs Insights query syntax
322 |         hours: Number of hours to look back
323 |         start_time: Optional ISO8601 start time
324 |         end_time: Optional ISO8601 end time
325 |         profile: Optional AWS profile name to use for credentials
326 |         region: Optional AWS region name to use for API calls
327 | 
328 |     Returns:
329 |         JSON string with search results
330 |     """
331 |     # Function body is handled by the decorator
332 |     pass
333 | 
334 | 
335 | @mcp.tool()
336 | @with_aws_config(CloudWatchLogsSearchTools)
337 | async def search_logs_multi(
338 |     log_group_names: List[str],
339 |     query: str,
340 |     hours: int = 24,
341 |     start_time: str = None,
342 |     end_time: str = None,
343 |     profile: str = None,
344 |     region: str = None,
345 | ) -> str:
346 |     """
347 |     Search logs across multiple log groups using CloudWatch Logs Insights.
348 | 
349 |     Args:
350 |         log_group_names: List of log groups to search
351 |         query: CloudWatch Logs Insights query in Logs Insights syntax
352 |         hours: Number of hours to look back (default: 24)
353 |         start_time: Optional ISO8601 start time
354 |         end_time: Optional ISO8601 end time
355 |         profile: Optional AWS profile name to use for credentials
356 |         region: Optional AWS region name to use for API calls
357 | 
358 |     Returns:
359 |         JSON string with search results
360 |     """
361 |     # Function body is handled by the decorator
362 |     pass
363 | 
364 | 
365 | @mcp.tool()
366 | @with_aws_config(CloudWatchLogsSearchTools)
367 | async def filter_log_events(
368 |     log_group_name: str,
369 |     filter_pattern: str,
370 |     hours: int = 24,
371 |     start_time: str = None,
372 |     end_time: str = None,
373 |     profile: str = None,
374 |     region: str = None,
375 | ) -> str:
376 |     """
377 |     Filter log events by pattern across all streams in a log group.
378 | 
379 |     Args:
380 |         log_group_name: The log group to filter
381 |         filter_pattern: The pattern to search for (CloudWatch Logs filter syntax)
382 |         hours: Number of hours to look back
383 |         start_time: Optional ISO8601 start time
384 |         end_time: Optional ISO8601 end time
385 |         profile: Optional AWS profile name to use for credentials
386 |         region: Optional AWS region name to use for API calls
387 | 
388 |     Returns:
389 |         JSON string with filtered events
390 |     """
391 |     # Function body is handled by the decorator
392 |     pass
393 | 
394 | 
395 | @mcp.tool()
396 | @with_aws_config(CloudWatchLogsAnalysisTools)
397 | async def summarize_log_activity(
398 |     log_group_name: str,
399 |     hours: int = 24,
400 |     start_time: str = None,
401 |     end_time: str = None,
402 |     profile: str = None,
403 |     region: str = None,
404 | ) -> str:
405 |     """
406 |     Generate a summary of log activity over a specified time period.
407 | 
408 |     Args:
409 |         log_group_name: The log group to analyze
410 |         hours: Number of hours to look back
411 |         start_time: Optional ISO8601 start time
412 |         end_time: Optional ISO8601 end time
413 |         profile: Optional AWS profile name to use for credentials
414 |         region: Optional AWS region name to use for API calls
415 | 
416 |     Returns:
417 |         JSON string with activity summary
418 |     """
419 |     # Function body is handled by the decorator
420 |     pass
421 | 
422 | 
423 | @mcp.tool()
424 | @with_aws_config(CloudWatchLogsAnalysisTools)
425 | async def find_error_patterns(
426 |     log_group_name: str,
427 |     hours: int = 24,
428 |     start_time: str = None,
429 |     end_time: str = None,
430 |     profile: str = None,
431 |     region: str = None,
432 | ) -> str:
433 |     """
434 |     Find common error patterns in logs.
435 | 
436 |     Args:
437 |         log_group_name: The log group to analyze
438 |         hours: Number of hours to look back
439 |         start_time: Optional ISO8601 start time
440 |         end_time: Optional ISO8601 end time
441 |         profile: Optional AWS profile name to use for credentials
442 |         region: Optional AWS region name to use for API calls
443 | 
444 |     Returns:
445 |         JSON string with error patterns
446 |     """
447 |     # Function body is handled by the decorator
448 |     pass
449 | 
450 | 
451 | @mcp.tool()
452 | @with_aws_config(CloudWatchLogsCorrelationTools)
453 | async def correlate_logs(
454 |     log_group_names: List[str],
455 |     search_term: str,
456 |     hours: int = 24,
457 |     start_time: str = None,
458 |     end_time: str = None,
459 |     profile: str = None,
460 |     region: str = None,
461 | ) -> str:
462 |     """
463 |     Correlate logs across multiple AWS services using a common search term.
464 | 
465 |     Args:
466 |         log_group_names: List of log group names to search
467 |         search_term: Term to search for in logs (request ID, transaction ID, etc.)
468 |         hours: Number of hours to look back
469 |         start_time: Optional ISO8601 start time
470 |         end_time: Optional ISO8601 end time
471 |         profile: Optional AWS profile name to use for credentials
472 |         region: Optional AWS region name to use for API calls
473 | 
474 |     Returns:
475 |         JSON string with correlated events
476 |     """
477 |     # Function body is handled by the decorator
478 |     pass
479 | 
480 | 
481 | if __name__ == "__main__":
482 |     # Run the MCP server
483 |     mcp.run()
484 | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/resources/cloudwatch_logs_resource.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | 
  3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4 | # SPDX-License-Identifier: Apache-2.0
  5 | 
  6 | import boto3
  7 | import json
  8 | from datetime import datetime, timedelta
  9 | from typing import Dict, List
 10 | import re
 11 | from collections import Counter
 12 | 
 13 | 
 14 | class CloudWatchLogsResource:
 15 |     """Resource class for handling CloudWatch Logs resources."""
 16 | 
 17 |     def __init__(self, profile_name=None, region_name=None):
 18 |         """Initialize the CloudWatch Logs resource client.
 19 | 
 20 |         Args:
 21 |             profile_name: Optional AWS profile name to use for credentials
 22 |             region_name: Optional AWS region name to use for API calls
 23 |         """
 24 |         # Store the profile name and region for later use
 25 |         self.profile_name = profile_name
 26 |         self.region_name = region_name
 27 | 
 28 |         # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
 29 |         session = boto3.Session(profile_name=profile_name, region_name=region_name)
 30 |         self.logs_client = session.client("logs")
 31 | 
 32 |     def get_log_groups(
 33 |         self, prefix: str = None, limit: int = 50, next_token: str = None
 34 |     ) -> str:
 35 |         """
 36 |         Get a list of CloudWatch Log Groups with optional filtering and pagination.
 37 | 
 38 |         Args:
 39 |             prefix: Optional prefix to filter log groups by name
 40 |             limit: Maximum number of log groups to return (default: 50)
 41 |             next_token: Token for pagination to get the next set of results
 42 | 
 43 |         Returns:
 44 |             JSON string with log groups information
 45 |         """
 46 |         kwargs = {"limit": limit}
 47 |         if prefix:
 48 |             kwargs["logGroupNamePrefix"] = prefix
 49 |         if next_token:
 50 |             kwargs["nextToken"] = next_token
 51 | 
 52 |         response = self.logs_client.describe_log_groups(**kwargs)
 53 |         log_groups = response.get("logGroups", [])
 54 | 
 55 |         # Format the log groups information
 56 |         formatted_groups = []
 57 |         for group in log_groups:
 58 |             formatted_groups.append(
 59 |                 {
 60 |                     "name": group.get("logGroupName"),
 61 |                     "arn": group.get("arn"),
 62 |                     "storedBytes": group.get("storedBytes"),
 63 |                     "creationTime": datetime.fromtimestamp(
 64 |                         group.get("creationTime", 0) / 1000
 65 |                     ).isoformat(),
 66 |                 }
 67 |             )
 68 | 
 69 |         # Include the nextToken if available
 70 |         result = {"logGroups": formatted_groups}
 71 | 
 72 |         if "nextToken" in response:
 73 |             result["nextToken"] = response["nextToken"]
 74 | 
 75 |         return json.dumps(result, indent=2)
 76 | 
 77 |     def get_log_group_details(self, log_group_name: str) -> str:
 78 |         """Get detailed information about a specific log group."""
 79 |         try:
 80 |             response = self.logs_client.describe_log_groups(
 81 |                 logGroupNamePrefix=log_group_name, limit=1
 82 |             )
 83 |             log_groups = response.get("logGroups", [])
 84 | 
 85 |             if not log_groups:
 86 |                 return json.dumps(
 87 |                     {"error": f"Log group '{log_group_name}' not found"}, indent=2
 88 |                 )
 89 | 
 90 |             log_group = log_groups[0]
 91 | 
 92 |             # Get retention policy
 93 |             retention = "Never Expire"
 94 |             if "retentionInDays" in log_group:
 95 |                 retention = f"{log_group['retentionInDays']} days"
 96 | 
 97 |             # Get metrics for the log group
 98 |             session = boto3.Session(
 99 |                 profile_name=self.profile_name, region_name=self.region_name
100 |             )
101 |             cloudwatch = session.client("cloudwatch")
102 |             end_time = datetime.utcnow()
103 |             start_time = end_time - timedelta(days=1)
104 | 
105 |             metrics_response = cloudwatch.get_metric_statistics(
106 |                 Namespace="AWS/Logs",
107 |                 MetricName="IncomingBytes",
108 |                 Dimensions=[
109 |                     {"Name": "LogGroupName", "Value": log_group_name},
110 |                 ],
111 |                 StartTime=start_time,
112 |                 EndTime=end_time,
113 |                 Period=3600,
114 |                 Statistics=["Sum"],
115 |             )
116 | 
117 |             # Format the detailed information
118 |             details = {
119 |                 "name": log_group.get("logGroupName"),
120 |                 "arn": log_group.get("arn"),
121 |                 "storedBytes": log_group.get("storedBytes"),
122 |                 "creationTime": datetime.fromtimestamp(
123 |                     log_group.get("creationTime", 0) / 1000
124 |                 ).isoformat(),
125 |                 "retentionPolicy": retention,
126 |                 "metricFilterCount": log_group.get("metricFilterCount", 0),
127 |                 "kmsKeyId": log_group.get("kmsKeyId", "Not encrypted with KMS"),
128 |                 "dailyIncomingBytes": [
129 |                     {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]}
130 |                     for point in metrics_response.get("Datapoints", [])
131 |                 ],
132 |             }
133 | 
134 |             return json.dumps(details, indent=2)
135 |         except Exception as e:
136 |             return json.dumps({"error": str(e)}, indent=2)
137 | 
138 |     def get_log_streams(self, log_group_name: str, limit: int = 20) -> str:
139 |         """
140 |         Get a list of log streams for a specific log group.
141 | 
142 |         Args:
143 |             log_group_name: The name of the log group
144 |             limit: Maximum number of streams to return (default: 20)
145 |         """
146 |         try:
147 |             response = self.logs_client.describe_log_streams(
148 |                 logGroupName=log_group_name,
149 |                 orderBy="LastEventTime",
150 |                 descending=True,
151 |                 limit=limit,
152 |             )
153 | 
154 |             log_streams = response.get("logStreams", [])
155 |             formatted_streams = []
156 | 
157 |             for stream in log_streams:
158 |                 last_event_time = stream.get("lastEventTimestamp", 0)
159 |                 first_event_time = stream.get("firstEventTimestamp", 0)
160 | 
161 |                 formatted_streams.append(
162 |                     {
163 |                         "name": stream.get("logStreamName"),
164 |                         "firstEventTime": datetime.fromtimestamp(
165 |                             first_event_time / 1000
166 |                         ).isoformat()
167 |                         if first_event_time
168 |                         else None,
169 |                         "lastEventTime": datetime.fromtimestamp(
170 |                             last_event_time / 1000
171 |                         ).isoformat()
172 |                         if last_event_time
173 |                         else None,
174 |                         "storedBytes": stream.get("storedBytes"),
175 |                     }
176 |                 )
177 | 
178 |             return json.dumps(formatted_streams, indent=2)
179 |         except Exception as e:
180 |             return json.dumps({"error": str(e)}, indent=2)
181 | 
182 |     def get_log_events(
183 |         self, log_group_name: str, log_stream_name: str, limit: int = 100
184 |     ) -> str:
185 |         """
186 |         Get log events from a specific log stream.
187 | 
188 |         Args:
189 |             log_group_name: The name of the log group
190 |             log_stream_name: The name of the log stream
191 |             limit: Maximum number of events to return (default: 100)
192 |         """
193 |         try:
194 |             response = self.logs_client.get_log_events(
195 |                 logGroupName=log_group_name,
196 |                 logStreamName=log_stream_name,
197 |                 limit=limit,
198 |                 startFromHead=False,
199 |             )
200 | 
201 |             events = response.get("events", [])
202 |             formatted_events = []
203 | 
204 |             for event in events:
205 |                 formatted_events.append(
206 |                     {
207 |                         "timestamp": datetime.fromtimestamp(
208 |                             event.get("timestamp", 0) / 1000
209 |                         ).isoformat(),
210 |                         "message": event.get("message"),
211 |                         "ingestionTime": datetime.fromtimestamp(
212 |                             event.get("ingestionTime", 0) / 1000
213 |                         ).isoformat(),
214 |                     }
215 |                 )
216 | 
217 |             return json.dumps(formatted_events, indent=2)
218 |         except Exception as e:
219 |             return json.dumps({"error": str(e)}, indent=2)
220 | 
221 |     def get_log_sample(self, log_group_name: str, limit: int = 10) -> str:
222 |         """Get a sample of recent logs from a log group."""
223 |         try:
224 |             # First get the most recent stream
225 |             stream_response = self.logs_client.describe_log_streams(
226 |                 logGroupName=log_group_name,
227 |                 orderBy="LastEventTime",
228 |                 descending=True,
229 |                 limit=1,
230 |             )
231 | 
232 |             log_streams = stream_response.get("logStreams", [])
233 |             if not log_streams:
234 |                 return json.dumps(
235 |                     {"error": f"No streams found in log group '{log_group_name}'"},
236 |                     indent=2,
237 |                 )
238 | 
239 |             # Get events from the most recent stream
240 |             log_stream_name = log_streams[0].get("logStreamName")
241 |             response = self.logs_client.get_log_events(
242 |                 logGroupName=log_group_name,
243 |                 logStreamName=log_stream_name,
244 |                 limit=limit,
245 |                 startFromHead=False,
246 |             )
247 | 
248 |             events = response.get("events", [])
249 |             formatted_events = []
250 | 
251 |             for event in events:
252 |                 formatted_events.append(
253 |                     {
254 |                         "timestamp": datetime.fromtimestamp(
255 |                             event.get("timestamp", 0) / 1000
256 |                         ).isoformat(),
257 |                         "message": event.get("message"),
258 |                         "streamName": log_stream_name,
259 |                     }
260 |                 )
261 | 
262 |             return json.dumps(
263 |                 {
264 |                     "description": f"Sample of {len(formatted_events)} recent logs from '{log_group_name}'",
265 |                     "logStream": log_stream_name,
266 |                     "events": formatted_events,
267 |                 },
268 |                 indent=2,
269 |             )
270 |         except Exception as e:
271 |             return json.dumps({"error": str(e)}, indent=2)
272 | 
273 |     def get_recent_errors(self, log_group_name: str, hours: int = 24) -> str:
274 |         """Get recent error logs from a log group."""
275 |         try:
276 |             # Calculate start time
277 |             end_time = int(datetime.now().timestamp() * 1000)
278 |             start_time = int(
279 |                 (datetime.now() - timedelta(hours=hours)).timestamp() * 1000
280 |             )
281 | 
282 |             # Use filter_log_events to search for errors across all streams
283 |             # Common error patterns to search for
284 |             error_patterns = [
285 |                 "ERROR",
286 |                 "Error",
287 |                 "error",
288 |                 "exception",
289 |                 "Exception",
290 |                 "EXCEPTION",
291 |                 "fail",
292 |                 "Fail",
293 |                 "FAIL",
294 |             ]
295 | 
296 |             filter_pattern = " ".join([f'"{pattern}"' for pattern in error_patterns])
297 |             response = self.logs_client.filter_log_events(
298 |                 logGroupName=log_group_name,
299 |                 filterPattern=f"{filter_pattern}",
300 |                 startTime=start_time,
301 |                 endTime=end_time,
302 |                 limit=100,
303 |             )
304 | 
305 |             events = response.get("events", [])
306 |             formatted_events = []
307 | 
308 |             for event in events:
309 |                 formatted_events.append(
310 |                     {
311 |                         "timestamp": datetime.fromtimestamp(
312 |                             event.get("timestamp", 0) / 1000
313 |                         ).isoformat(),
314 |                         "message": event.get("message"),
315 |                         "logStreamName": event.get("logStreamName"),
316 |                     }
317 |                 )
318 | 
319 |             return json.dumps(
320 |                 {
321 |                     "description": f"Recent errors from '{log_group_name}' in the last {hours} hours",
322 |                     "totalErrors": len(formatted_events),
323 |                     "events": formatted_events,
324 |                 },
325 |                 indent=2,
326 |             )
327 |         except Exception as e:
328 |             return json.dumps({"error": str(e)}, indent=2)
329 | 
330 |     def get_log_metrics(self, log_group_name: str, hours: int = 24) -> str:
331 |         """Get log volume metrics for a log group."""
332 |         try:
333 |             # Create CloudWatch client
334 |             session = boto3.Session(
335 |                 profile_name=self.profile_name, region_name=self.region_name
336 |             )
337 |             cloudwatch = session.client("cloudwatch")
338 | 
339 |             # Calculate start and end times
340 |             end_time = datetime.utcnow()
341 |             start_time = end_time - timedelta(hours=hours)
342 | 
343 |             # Get incoming bytes
344 |             incoming_bytes = cloudwatch.get_metric_statistics(
345 |                 Namespace="AWS/Logs",
346 |                 MetricName="IncomingBytes",
347 |                 Dimensions=[
348 |                     {"Name": "LogGroupName", "Value": log_group_name},
349 |                 ],
350 |                 StartTime=start_time,
351 |                 EndTime=end_time,
352 |                 Period=3600,  # 1 hour periods
353 |                 Statistics=["Sum"],
354 |             )
355 | 
356 |             # Get incoming log events
357 |             incoming_events = cloudwatch.get_metric_statistics(
358 |                 Namespace="AWS/Logs",
359 |                 MetricName="IncomingLogEvents",
360 |                 Dimensions=[
361 |                     {"Name": "LogGroupName", "Value": log_group_name},
362 |                 ],
363 |                 StartTime=start_time,
364 |                 EndTime=end_time,
365 |                 Period=3600,  # 1 hour periods
366 |                 Statistics=["Sum"],
367 |             )
368 | 
369 |             # Format metrics data
370 |             bytes_datapoints = incoming_bytes.get("Datapoints", [])
371 |             events_datapoints = incoming_events.get("Datapoints", [])
372 | 
373 |             bytes_datapoints.sort(key=lambda x: x["Timestamp"])
374 |             events_datapoints.sort(key=lambda x: x["Timestamp"])
375 | 
376 |             bytes_data = [
377 |                 {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]}
378 |                 for point in bytes_datapoints
379 |             ]
380 | 
381 |             events_data = [
382 |                 {"timestamp": point["Timestamp"].isoformat(), "events": point["Sum"]}
383 |                 for point in events_datapoints
384 |             ]
385 | 
386 |             # Calculate totals
387 |             total_bytes = sum(point["Sum"] for point in bytes_datapoints)
388 |             total_events = sum(point["Sum"] for point in events_datapoints)
389 | 
390 |             return json.dumps(
391 |                 {
392 |                     "description": f"Log metrics for '{log_group_name}' over the last {hours} hours",
393 |                     "totalBytes": total_bytes,
394 |                     "totalEvents": total_events,
395 |                     "bytesByHour": bytes_data,
396 |                     "eventsByHour": events_data,
397 |                 },
398 |                 indent=2,
399 |             )
400 |         except Exception as e:
401 |             return json.dumps({"error": str(e)}, indent=2)
402 | 
403 |     def analyze_log_structure(self, log_group_name: str) -> str:
404 |         """Analyze and provide information about the structure of logs."""
405 |         try:
406 |             # Get a sample of logs to analyze
407 |             sample_data = json.loads(self.get_log_sample(log_group_name, 50))
408 | 
409 |             if "error" in sample_data:
410 |                 return json.dumps(sample_data, indent=2)
411 | 
412 |             events = sample_data.get("events", [])
413 | 
414 |             if not events:
415 |                 return json.dumps(
416 |                     {"error": "No log events found for analysis"}, indent=2
417 |                 )
418 | 
419 |             # Analyze the structure
420 |             structure_info = {
421 |                 "description": f"Log structure analysis for '{log_group_name}'",
422 |                 "sampleSize": len(events),
423 |                 "format": self._detect_log_format(events),
424 |                 "commonPatterns": self._extract_common_patterns(events),
425 |                 "fieldAnalysis": self._analyze_fields(events),
426 |             }
427 | 
428 |             return json.dumps(structure_info, indent=2)
429 |         except Exception as e:
430 |             return json.dumps({"error": str(e)}, indent=2)
431 | 
432 |     def _detect_log_format(self, events: List[Dict]) -> str:
433 |         """Detect the format of logs (JSON, plaintext, etc.)."""
434 |         json_count = 0
435 |         key_value_count = 0
436 |         xml_count = 0
437 | 
438 |         for event in events:
439 |             message = event.get("message", "")
440 | 
441 |             # Check for JSON format
442 |             if message.strip().startswith("{") and message.strip().endswith("}"):
443 |                 try:
444 |                     json.loads(message)
445 |                     json_count += 1
446 |                     continue
447 |                 except json.JSONDecodeError:
448 |                     pass
449 | 
450 |             # Check for XML format
451 |             if message.strip().startswith("<") and message.strip().endswith(">"):
452 |                 xml_count += 1
453 |                 continue
454 | 
455 |             # Check for key-value pairs
456 |             if re.search(r"\w+=[\'\"][^\'\"]*[\'\"]|\w+=\S+", message):
457 |                 key_value_count += 1
458 | 
459 |         total = len(events)
460 | 
461 |         if json_count > total * 0.7:
462 |             return "JSON"
463 |         elif xml_count > total * 0.7:
464 |             return "XML"
465 |         elif key_value_count > total * 0.7:
466 |             return "Key-Value Pairs"
467 |         else:
468 |             return "Plaintext/Unstructured"
469 | 
470 |     def _extract_common_patterns(self, events: List[Dict]) -> Dict:
471 |         """Extract common patterns from log messages."""
472 |         # Look for common log patterns
473 |         level_pattern = re.compile(
474 |             r"\b(DEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)\b"
475 |         )
476 |         timestamp_patterns = [
477 |             re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"),  # ISO format
478 |             re.compile(
479 |                 r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
480 |             ),  # Common datetime format
481 |             re.compile(r"\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}"),  # MM/DD/YYYY format
482 |         ]
483 | 
484 |         # Count occurrences
485 |         levels = Counter()
486 |         has_timestamp = 0
487 | 
488 |         for event in events:
489 |             message = event.get("message", "")
490 | 
491 |             # Check log levels
492 |             level_match = level_pattern.search(message)
493 |             if level_match:
494 |                 levels[level_match.group(0)] += 1
495 | 
496 |             # Check timestamps in message content (not event timestamp)
497 |             for pattern in timestamp_patterns:
498 |                 if pattern.search(message):
499 |                     has_timestamp += 1
500 |                     break
501 | 
502 |         return {
503 |             "logLevels": dict(levels),
504 |             "containsTimestamp": has_timestamp,
505 |             "timestampPercentage": round((has_timestamp / len(events)) * 100, 2)
506 |             if events
507 |             else 0,
508 |         }
509 | 
510 |     def _analyze_fields(self, events: List[Dict]) -> Dict:
511 |         """Analyze fields in structured log messages."""
512 |         format_type = self._detect_log_format(events)
513 | 
514 |         if format_type == "JSON":
515 |             # Try to extract fields from JSON logs
516 |             fields_count = Counter()
517 | 
518 |             for event in events:
519 |                 message = event.get("message", "")
520 |                 try:
521 |                     json_data = json.loads(message)
522 |                     for key in json_data.keys():
523 |                         fields_count[key] += 1
524 |                 except json.JSONDecodeError:
525 |                     continue
526 | 
527 |             # Get the most common fields
528 |             common_fields = [
529 |                 {
530 |                     "field": field,
531 |                     "occurrences": count,
532 |                     "percentage": round((count / len(events)) * 100, 2),
533 |                 }
534 |                 for field, count in fields_count.most_common(10)
535 |             ]
536 | 
537 |             return {"commonFields": common_fields, "uniqueFields": len(fields_count)}
538 | 
539 |         elif format_type == "Key-Value Pairs":
540 |             # Try to extract key-value pairs
541 |             key_pattern = re.compile(r"(\w+)=[\'\"]?([^\'\"\s]*)[\'\"]?")
542 |             fields_count = Counter()
543 | 
544 |             for event in events:
545 |                 message = event.get("message", "")
546 |                 matches = key_pattern.findall(message)
547 |                 for key, _ in matches:
548 |                     fields_count[key] += 1
549 | 
550 |             # Get the most common fields
551 |             common_fields = [
552 |                 {
553 |                     "field": field,
554 |                     "occurrences": count,
555 |                     "percentage": round((count / len(events)) * 100, 2),
556 |                 }
557 |                 for field, count in fields_count.most_common(10)
558 |             ]
559 | 
560 |             return {"commonFields": common_fields, "uniqueFields": len(fields_count)}
561 | 
562 |         else:
563 |             return {
564 |                 "analysis": f"Field analysis not applicable for {format_type} format"
565 |             }
566 | 
```

--------------------------------------------------------------------------------
/src/client.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | 
  3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4 | # SPDX-License-Identifier: Apache-2.0
  5 | 
  6 | import asyncio
  7 | import argparse
  8 | import json
  9 | import sys
 10 | import os
 11 | 
 12 | from mcp import ClientSession, StdioServerParameters
 13 | from mcp.client.stdio import stdio_client
 14 | 
 15 | # Set up argument parser for the CLI
 16 | parser = argparse.ArgumentParser(description="CloudWatch Logs MCP Client")
 17 | parser.add_argument(
 18 |     "--profile", type=str, help="AWS profile name to use for credentials"
 19 | )
 20 | parser.add_argument("--region", type=str, help="AWS region name to use for API calls")
 21 | subparsers = parser.add_subparsers(dest="command", help="Command to execute")
 22 | 
 23 | # List log groups command
 24 | list_groups_parser = subparsers.add_parser(
 25 |     "list-groups", help="List CloudWatch log groups"
 26 | )
 27 | list_groups_parser.add_argument("--prefix", help="Filter log groups by name prefix")
 28 | list_groups_parser.add_argument(
 29 |     "--limit",
 30 |     type=int,
 31 |     default=50,
 32 |     help="Maximum number of log groups to return (default: 50)",
 33 | )
 34 | list_groups_parser.add_argument(
 35 |     "--next-token", help="Token for pagination to get the next set of results"
 36 | )
 37 | list_groups_parser.add_argument(
 38 |     "--use-tool", action="store_true", help="Use the tool interface instead of resource"
 39 | )
 40 | list_groups_parser.add_argument(
 41 |     "--profile", help="AWS profile name to use for credentials"
 42 | )
 43 | list_groups_parser.add_argument("--region", help="AWS region name to use for API calls")
 44 | 
 45 | # Get log group details command
 46 | group_details_parser = subparsers.add_parser(
 47 |     "group-details", help="Get detailed information about a log group"
 48 | )
 49 | group_details_parser.add_argument("log_group_name", help="The name of the log group")
 50 | group_details_parser.add_argument(
 51 |     "--profile", help="AWS profile name to use for credentials"
 52 | )
 53 | group_details_parser.add_argument(
 54 |     "--region", help="AWS region name to use for API calls"
 55 | )
 56 | 
 57 | # List log streams command
 58 | list_streams_parser = subparsers.add_parser(
 59 |     "list-streams", help="List log streams for a specific log group"
 60 | )
 61 | list_streams_parser.add_argument("log_group_name", help="The name of the log group")
 62 | list_streams_parser.add_argument(
 63 |     "--profile", help="AWS profile name to use for credentials"
 64 | )
 65 | list_streams_parser.add_argument(
 66 |     "--region", help="AWS region name to use for API calls"
 67 | )
 68 | 
 69 | # Get log events command
 70 | get_events_parser = subparsers.add_parser(
 71 |     "get-events", help="Get log events from a specific log stream"
 72 | )
 73 | get_events_parser.add_argument("log_group_name", help="The name of the log group")
 74 | get_events_parser.add_argument("log_stream_name", help="The name of the log stream")
 75 | get_events_parser.add_argument(
 76 |     "--profile", help="AWS profile name to use for credentials"
 77 | )
 78 | get_events_parser.add_argument("--region", help="AWS region name to use for API calls")
 79 | 
 80 | # Get log sample command
 81 | sample_parser = subparsers.add_parser(
 82 |     "sample", help="Get a sample of recent logs from a log group"
 83 | )
 84 | sample_parser.add_argument("log_group_name", help="The name of the log group")
 85 | sample_parser.add_argument(
 86 |     "--limit", type=int, default=10, help="Number of logs to sample (default: 10)"
 87 | )
 88 | sample_parser.add_argument("--profile", help="AWS profile name to use for credentials")
 89 | sample_parser.add_argument("--region", help="AWS region name to use for API calls")
 90 | 
 91 | # Get recent errors command
 92 | errors_parser = subparsers.add_parser(
 93 |     "recent-errors", help="Get recent error logs from a log group"
 94 | )
 95 | errors_parser.add_argument(
 96 |     "log_group_name", help="The name of the log group to analyze"
 97 | )
 98 | errors_parser.add_argument(
 99 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
100 | )
101 | errors_parser.add_argument("--profile", help="AWS profile name to use for credentials")
102 | errors_parser.add_argument("--region", help="AWS region name to use for API calls")
103 | 
104 | # Get log metrics command
105 | metrics_parser = subparsers.add_parser(
106 |     "metrics", help="Get log volume metrics for a log group"
107 | )
108 | metrics_parser.add_argument(
109 |     "log_group_name", help="The name of the log group to analyze"
110 | )
111 | metrics_parser.add_argument(
112 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
113 | )
114 | metrics_parser.add_argument("--profile", help="AWS profile name to use for credentials")
115 | metrics_parser.add_argument("--region", help="AWS region name to use for API calls")
116 | 
117 | # Analyze log structure command
118 | structure_parser = subparsers.add_parser(
119 |     "structure", help="Analyze the structure of logs in a log group"
120 | )
121 | structure_parser.add_argument(
122 |     "log_group_name", help="The name of the log group to analyze"
123 | )
124 | structure_parser.add_argument(
125 |     "--profile", help="AWS profile name to use for credentials"
126 | )
127 | structure_parser.add_argument("--region", help="AWS region name to use for API calls")
128 | 
129 | # Get analyze logs prompt command
130 | prompt_parser = subparsers.add_parser(
131 |     "get-prompt", help="Get a prompt for analyzing CloudWatch logs"
132 | )
133 | prompt_parser.add_argument(
134 |     "log_group_name", help="The name of the log group to analyze"
135 | )
136 | prompt_parser.add_argument("--profile", help="AWS profile name to use for credentials")
137 | prompt_parser.add_argument("--region", help="AWS region name to use for API calls")
138 | 
139 | # Get list groups prompt command
140 | list_prompt_parser = subparsers.add_parser(
141 |     "list-prompt", help="Get a prompt for listing CloudWatch log groups"
142 | )
143 | list_prompt_parser.add_argument(
144 |     "--prefix", help="Optional prefix to filter log groups by name"
145 | )
146 | list_prompt_parser.add_argument(
147 |     "--profile", help="AWS profile name to use for credentials"
148 | )
149 | list_prompt_parser.add_argument("--region", help="AWS region name to use for API calls")
150 | 
151 | # Search logs command
152 | search_parser = subparsers.add_parser(
153 |     "search", help="Search for patterns in CloudWatch logs"
154 | )
155 | search_parser.add_argument("log_group_name", help="The name of the log group to search")
156 | search_parser.add_argument(
157 |     "query", help="The search query (CloudWatch Logs Insights syntax)"
158 | )
159 | search_parser.add_argument(
160 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
161 | )
162 | search_parser.add_argument(
163 |     "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
164 | )
165 | search_parser.add_argument(
166 |     "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
167 | )
168 | search_parser.add_argument("--profile", help="AWS profile name to use for credentials")
169 | search_parser.add_argument("--region", help="AWS region name to use for API calls")
170 | 
171 | # Search multiple log groups command
172 | search_multi_parser = subparsers.add_parser(
173 |     "search-multi", help="Search for patterns across multiple CloudWatch log groups"
174 | )
175 | search_multi_parser.add_argument(
176 |     "log_group_names", nargs="+", help="List of log group names to search"
177 | )
178 | search_multi_parser.add_argument(
179 |     "query", help="The search query (CloudWatch Logs Insights syntax)"
180 | )
181 | search_multi_parser.add_argument(
182 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
183 | )
184 | search_multi_parser.add_argument(
185 |     "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
186 | )
187 | search_multi_parser.add_argument(
188 |     "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
189 | )
190 | search_multi_parser.add_argument(
191 |     "--profile", help="AWS profile name to use for credentials"
192 | )
193 | search_multi_parser.add_argument(
194 |     "--region", help="AWS region name to use for API calls"
195 | )
196 | 
197 | # Summarize log activity command
198 | summarize_parser = subparsers.add_parser(
199 |     "summarize", help="Generate a summary of log activity"
200 | )
201 | summarize_parser.add_argument(
202 |     "log_group_name", help="The name of the log group to analyze"
203 | )
204 | summarize_parser.add_argument(
205 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
206 | )
207 | summarize_parser.add_argument(
208 |     "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
209 | )
210 | summarize_parser.add_argument(
211 |     "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
212 | )
213 | summarize_parser.add_argument(
214 |     "--profile", help="AWS profile name to use for credentials"
215 | )
216 | summarize_parser.add_argument("--region", help="AWS region name to use for API calls")
217 | 
218 | # Find error patterns command
219 | errors_parser = subparsers.add_parser(
220 |     "find-errors", help="Find common error patterns in logs"
221 | )
222 | errors_parser.add_argument(
223 |     "log_group_name", help="The name of the log group to analyze"
224 | )
225 | errors_parser.add_argument(
226 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
227 | )
228 | errors_parser.add_argument(
229 |     "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
230 | )
231 | errors_parser.add_argument(
232 |     "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
233 | )
234 | errors_parser.add_argument("--profile", help="AWS profile name to use for credentials")
235 | errors_parser.add_argument("--region", help="AWS region name to use for API calls")
236 | 
237 | # Correlate logs command
238 | correlate_parser = subparsers.add_parser(
239 |     "correlate", help="Correlate logs across multiple AWS services"
240 | )
241 | correlate_parser.add_argument(
242 |     "log_group_names", nargs="+", help="List of log group names to search"
243 | )
244 | correlate_parser.add_argument("search_term", help="Term to search for in logs")
245 | correlate_parser.add_argument(
246 |     "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
247 | )
248 | correlate_parser.add_argument(
249 |     "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
250 | )
251 | correlate_parser.add_argument(
252 |     "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
253 | )
254 | correlate_parser.add_argument(
255 |     "--profile", help="AWS profile name to use for credentials"
256 | )
257 | correlate_parser.add_argument("--region", help="AWS region name to use for API calls")
258 | 
259 | 
260 | def add_aws_config_args(tool_args, args):
261 |     """Add profile and region arguments to tool calls if specified."""
262 |     if args.profile:
263 |         tool_args["profile"] = args.profile
264 |     if args.region:
265 |         tool_args["region"] = args.region
266 |     return tool_args
267 | 
268 | 
269 | async def main():
270 |     """Main function to run the CloudWatch Logs MCP client."""
271 |     args = parser.parse_args()
272 | 
273 |     # Determine the server path (relative or absolute)
274 |     script_dir = os.path.dirname(os.path.abspath(__file__))
275 |     server_path = os.path.join(script_dir, "cw-mcp-server", "server.py")
276 | 
277 |     # Prepare server arguments
278 |     server_args = [server_path]
279 |     if args.profile:
280 |         server_args.extend(["--profile", args.profile])
281 |     if args.region:
282 |         server_args.extend(["--region", args.region])
283 | 
284 |     # Create server parameters
285 |     server_params = StdioServerParameters(command="python3", args=server_args, env=None)
286 | 
287 |     # Connect to the server
288 |     async with stdio_client(server_params) as (read, write):
289 |         async with ClientSession(read, write) as session:
290 |             # Initialize the client session
291 |             await session.initialize()
292 | 
293 |             # Check if a command was specified
294 |             if args.command is None:
295 |                 parser.print_help()
296 |                 return
297 | 
298 |             try:
299 |                 # Execute the requested command
300 |                 if args.command == "list-groups":
301 |                     if args.use_tool:
302 |                         # Use the tool interface
303 |                         tool_args = {}
304 |                         if args.prefix:
305 |                             tool_args["prefix"] = args.prefix
306 |                         if args.limit:
307 |                             tool_args["limit"] = args.limit
308 |                         if args.next_token:
309 |                             tool_args["next_token"] = args.next_token
310 |                         tool_args = add_aws_config_args(tool_args, args)
311 | 
312 |                         result = await session.call_tool(
313 |                             "list_log_groups", arguments=tool_args
314 |                         )
315 |                         print_json_response(result)
316 |                     else:
317 |                         # Use the resource interface
318 |                         # Build query string for parameters if provided
319 |                         if args.prefix:
320 |                             # If prefix is provided, use the filtered endpoint
321 |                             resource_uri = f"logs://groups/filter/{args.prefix}"
322 |                         else:
323 |                             resource_uri = "logs://groups"
324 | 
325 |                         content, _ = await session.read_resource(resource_uri)
326 |                         print_json_response(content)
327 | 
328 |                 elif args.command == "group-details":
329 |                     resource_uri = f"logs://groups/{args.log_group_name}"
330 |                     content, _ = await session.read_resource(resource_uri)
331 |                     print_json_response(content)
332 | 
333 |                 elif args.command == "list-streams":
334 |                     resource_uri = f"logs://groups/{args.log_group_name}/streams"
335 |                     content, _ = await session.read_resource(resource_uri)
336 |                     print_json_response(content)
337 | 
338 |                 elif args.command == "get-events":
339 |                     resource_uri = f"logs://groups/{args.log_group_name}/streams/{args.log_stream_name}"
340 |                     content, _ = await session.read_resource(resource_uri)
341 |                     print_json_response(content)
342 | 
343 |                 elif args.command == "sample":
344 |                     resource_uri = (
345 |                         f"logs://groups/{args.log_group_name}/sample?limit={args.limit}"
346 |                     )
347 |                     content, _ = await session.read_resource(resource_uri)
348 |                     print_json_response(content)
349 | 
350 |                 elif args.command == "recent-errors":
351 |                     resource_uri = f"logs://groups/{args.log_group_name}/recent-errors?hours={args.hours}"
352 |                     content, _ = await session.read_resource(resource_uri)
353 |                     print_json_response(content)
354 | 
355 |                 elif args.command == "metrics":
356 |                     resource_uri = f"logs://groups/{args.log_group_name}/metrics?hours={args.hours}"
357 |                     content, _ = await session.read_resource(resource_uri)
358 |                     print_json_response(content)
359 | 
360 |                 elif args.command == "structure":
361 |                     resource_uri = f"logs://groups/{args.log_group_name}/structure"
362 |                     content, _ = await session.read_resource(resource_uri)
363 |                     print_json_response(content)
364 | 
365 |                 elif args.command == "get-prompt":
366 |                     # Get the analyze logs prompt from the server
367 |                     arguments = {"log_group_name": args.log_group_name}
368 |                     arguments = add_aws_config_args(arguments, args)
369 |                     result = await session.get_prompt(
370 |                         "analyze_cloudwatch_logs",
371 |                         arguments=arguments,
372 |                     )
373 | 
374 |                     # Extract and print the prompt text
375 |                     prompt_messages = result.messages
376 |                     if prompt_messages and len(prompt_messages) > 0:
377 |                         message = prompt_messages[0]
378 |                         if hasattr(message, "content") and hasattr(
379 |                             message.content, "text"
380 |                         ):
381 |                             print(message.content.text)
382 |                         else:
383 |                             print(
384 |                                 json.dumps(
385 |                                     message, default=lambda x: x.__dict__, indent=2
386 |                                 )
387 |                             )
388 |                     else:
389 |                         print("No prompt received.")
390 | 
391 |                 elif args.command == "list-prompt":
392 |                     # Get arguments for the prompt
393 |                     arguments = {}
394 |                     if args.prefix:
395 |                         arguments["prefix"] = args.prefix
396 |                     arguments = add_aws_config_args(arguments, args)
397 | 
398 |                     # Get the list logs prompt from the server
399 |                     result = await session.get_prompt(
400 |                         "list_cloudwatch_log_groups", arguments=arguments
401 |                     )
402 | 
403 |                     # Extract and print the prompt text
404 |                     prompt_messages = result.messages
405 |                     if prompt_messages and len(prompt_messages) > 0:
406 |                         message = prompt_messages[0]
407 |                         if hasattr(message, "content") and hasattr(
408 |                             message.content, "text"
409 |                         ):
410 |                             print(message.content.text)
411 |                         else:
412 |                             print(
413 |                                 json.dumps(
414 |                                     message, default=lambda x: x.__dict__, indent=2
415 |                                 )
416 |                             )
417 |                     else:
418 |                         print("No prompt received.")
419 | 
420 |                 elif args.command == "search":
421 |                     tool_args = {
422 |                         "log_group_name": args.log_group_name,
423 |                         "query": args.query,
424 |                     }
425 |                     if args.start_time:
426 |                         tool_args["start_time"] = args.start_time
427 |                     if args.end_time:
428 |                         tool_args["end_time"] = args.end_time
429 |                     if not (args.start_time or args.end_time):
430 |                         tool_args["hours"] = args.hours
431 |                     tool_args = add_aws_config_args(tool_args, args)
432 |                     result = await session.call_tool(
433 |                         "search_logs",
434 |                         arguments=tool_args,
435 |                     )
436 |                     print_json_response(result)
437 | 
438 |                 elif args.command == "search-multi":
439 |                     tool_args = {
440 |                         "log_group_names": args.log_group_names,
441 |                         "query": args.query,
442 |                     }
443 |                     if args.start_time:
444 |                         tool_args["start_time"] = args.start_time
445 |                     if args.end_time:
446 |                         tool_args["end_time"] = args.end_time
447 |                     if not (args.start_time or args.end_time):
448 |                         tool_args["hours"] = args.hours
449 |                     tool_args = add_aws_config_args(tool_args, args)
450 |                     result = await session.call_tool(
451 |                         "search_logs_multi",
452 |                         arguments=tool_args,
453 |                     )
454 |                     print_json_response(result)
455 | 
456 |                 elif args.command == "summarize":
457 |                     tool_args = {
458 |                         "log_group_name": args.log_group_name,
459 |                     }
460 |                     if args.start_time:
461 |                         tool_args["start_time"] = args.start_time
462 |                     if args.end_time:
463 |                         tool_args["end_time"] = args.end_time
464 |                     if not (args.start_time or args.end_time):
465 |                         tool_args["hours"] = args.hours
466 |                     tool_args = add_aws_config_args(tool_args, args)
467 |                     result = await session.call_tool(
468 |                         "summarize_log_activity",
469 |                         arguments=tool_args,
470 |                     )
471 |                     print_json_response(result)
472 | 
473 |                 elif args.command == "find-errors":
474 |                     tool_args = {
475 |                         "log_group_name": args.log_group_name,
476 |                     }
477 |                     if args.start_time:
478 |                         tool_args["start_time"] = args.start_time
479 |                     if args.end_time:
480 |                         tool_args["end_time"] = args.end_time
481 |                     if not (args.start_time or args.end_time):
482 |                         tool_args["hours"] = args.hours
483 |                     tool_args = add_aws_config_args(tool_args, args)
484 |                     result = await session.call_tool(
485 |                         "find_error_patterns",
486 |                         arguments=tool_args,
487 |                     )
488 |                     print_json_response(result)
489 | 
490 |                 elif args.command == "correlate":
491 |                     tool_args = {
492 |                         "log_group_names": args.log_group_names,
493 |                         "search_term": args.search_term,
494 |                     }
495 |                     if args.start_time:
496 |                         tool_args["start_time"] = args.start_time
497 |                     if args.end_time:
498 |                         tool_args["end_time"] = args.end_time
499 |                     if not (args.start_time or args.end_time):
500 |                         tool_args["hours"] = args.hours
501 |                     tool_args = add_aws_config_args(tool_args, args)
502 |                     result = await session.call_tool(
503 |                         "correlate_logs",
504 |                         arguments=tool_args,
505 |                     )
506 |                     print_json_response(result)
507 | 
508 |             except Exception as e:
509 |                 print(f"Error: {str(e)}", file=sys.stderr)
510 |                 sys.exit(1)
511 | 
512 | 
513 | def print_json_response(content: str | tuple | object | None):
514 |     """Print JSON content in a formatted way.
515 | 
516 |     Args:
517 |         content: The content to print, which could be:
518 |             - String (direct JSON content)
519 |             - Tuple (from read_resource, where the first element is the content)
520 |             - Object with .content or .text attributes (from CallToolResult)
521 |             - None
522 |     """
523 |     try:
524 |         # Handle None case
525 |         if content is None:
526 |             print("No content received.")
527 |             return
528 | 
529 |         # For Session.read_resource responses, which returns tuple (meta, content)
530 |         # but we found that sometimes content is None
531 |         if isinstance(content, tuple):
532 |             meta, content_text = (
533 |                 content
534 |                 if len(content) >= 2
535 |                 else (content[0] if len(content) == 1 else None, None)
536 |             )
537 | 
538 |             # If we have usable content in the second element, use it
539 |             if content_text is not None:
540 |                 content = content_text
541 |             # Otherwise, if meta looks usable, try that
542 |             elif isinstance(meta, str) and meta != "meta":
543 |                 content = meta
544 |             # We don't have usable content in the tuple
545 |             else:
546 |                 print("No usable content found in the response.")
547 |                 return
548 | 
549 |         # Handle object with content attribute (from CallToolResult)
550 |         if hasattr(content, "content"):
551 |             content = content.content
552 | 
553 |         # Handle object with text attribute
554 |         if hasattr(content, "text"):
555 |             content = content.text
556 | 
557 |         # Handle CallToolResult content from mcp_types which can be a list
558 |         if isinstance(content, list) and all(hasattr(item, "text") for item in content):
559 |             # Extract text from each item
560 |             extracted_texts = [item.text for item in content if item.text]
561 |             if extracted_texts:
562 |                 content = extracted_texts[0]  # Use the first text element
563 | 
564 |         # Handle if content is a custom object with __str__ method
565 |         if not isinstance(content, (str, bytes, bytearray)) and hasattr(
566 |             content, "__str__"
567 |         ):
568 |             content = str(content)
569 | 
570 |         # Try to handle various formats
571 |         if isinstance(content, str):
572 |             try:
573 |                 # Try to parse as JSON
574 |                 parsed = json.loads(content)
575 |                 print(json.dumps(parsed, indent=2))
576 |             except json.JSONDecodeError:
577 |                 # Not valid JSON, just print the string
578 |                 print(content)
579 |         elif isinstance(content, (dict, list)):
580 |             # Direct Python objects
581 |             print(json.dumps(content, indent=2, default=lambda x: str(x)))
582 |         else:
583 |             # Fall back to string representation
584 |             print(content)
585 | 
586 |     except Exception as e:
587 |         # Catch-all for any unexpected errors
588 |         print(f"Error processing response: {e}")
589 |         print(content)
590 | 
591 | 
592 | if __name__ == "__main__":
593 |     asyncio.run(main())
594 | 
```