# Directory Structure ``` ├── .github │ ├── pull_request_template.md │ └── workflows │ └── pre-commit.yml ├── .gitignore ├── .pre-commit-config.yaml ├── .python-version ├── CODE_OF_CONDUCT.md ├── CONTRIBUTING.md ├── docs │ ├── ai-integration.md │ ├── architecture.md │ ├── assets │ │ ├── claude-desktop-settings.png │ │ ├── claude-server-disconnected.png │ │ └── Log-Analyzer-with-MCP-arch.png │ ├── aws-config.md │ ├── features.md │ ├── troubleshooting.md │ └── usage.md ├── LICENSE ├── NOTICE ├── pyproject.toml ├── README.md ├── src │ ├── client.py │ └── cw-mcp-server │ ├── __init__.py │ ├── resources │ │ ├── __init__.py │ │ └── cloudwatch_logs_resource.py │ ├── server.py │ └── tools │ ├── __init__.py │ ├── analysis_tools.py │ ├── correlation_tools.py │ ├── search_tools.py │ └── utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.12 2 | ``` -------------------------------------------------------------------------------- /.pre-commit-config.yaml: -------------------------------------------------------------------------------- ```yaml 1 | repos: 2 | - repo: https://github.com/astral-sh/ruff-pre-commit 3 | rev: v0.11.10 4 | hooks: 5 | - id: ruff-check 6 | args: [--fix] 7 | - id: ruff-format 8 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python 2 | __pycache__/ 3 | *.py[co] 4 | *.so 5 | .Python 6 | build/ 7 | develop-eggs/ 8 | dist/ 9 | downloads/ 10 | eggs/ 11 | .eggs/ 12 | lib/ 13 | lib64/ 14 | parts/ 15 | sdist/ 16 | var/ 17 | wheels/ 18 | *.egg-info/ 19 | .installed.cfg 20 | *.egg 21 | 22 | # Virtual environments 23 | .env 24 | .venv 25 | env/ 26 | venv/ 27 | ENV/ 28 | env.bak/ 29 | venv.bak/ 30 | 31 | # IDE 32 | .idea/ 33 | .vscode/ 34 | *.swp 35 | *.swo 36 | *~ 37 | 38 | # Testing 39 | .coverage 40 | htmlcov/ 41 | .tox/ 42 | .nox/ 43 | .coverage.* 44 | .cache 45 | nosetests.xml 46 | coverage.xml 47 | *.cover 48 | *.py,cover 49 | .hypothesis/ 50 | .pytest_cache/ 51 | 52 | # Distribution / packaging 53 | .Python 54 | build/ 55 | develop-eggs/ 56 | dist/ 57 | downloads/ 58 | eggs/ 59 | .eggs/ 60 | lib/ 61 | lib64/ 62 | parts/ 63 | sdist/ 64 | var/ 65 | wheels/ 66 | *.egg-info/ 67 | .installed.cfg 68 | *.egg 69 | 70 | # Jupyter Notebook 71 | .ipynb_checkpoints 72 | 73 | # pyenv 74 | .python-version 75 | 76 | # ruff 77 | .ruff_cache/ 78 | 79 | # mypy 80 | .mypy_cache/ 81 | .dmypy.json 82 | dmypy.json 83 | 84 | # OS specific 85 | .DS_Store 86 | .DS_Store? 87 | ._* 88 | .Spotlight-V100 89 | .Trashes 90 | ehthumbs.db 91 | Thumbs.db 92 | 93 | # Logs 94 | *.log 95 | logs/ 96 | log/ 97 | 98 | # Local development 99 | .env 100 | .env.local 101 | .env.*.local 102 | 103 | # Misc 104 | *.bak 105 | *.tmp 106 | *.temp ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Log Analyzer with MCP 2 | 3 | A [Model Context Protocol (MCP)](https://modelcontextprotocol.io) server that provides AI assistants access to AWS CloudWatch Logs for analysis, searching, and correlation. 4 | 5 | ## 🏗️ Architecture 6 |  7 | 8 | ## 🔌 Model Context Protocol (MCP) 9 | 10 | As outlined by Anthropic: 11 | > MCP is an open protocol that standardizes how applications provide context to LLMs. Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect your devices to various peripherals and accessories, MCP provides a standardized way to connect AI models to different data sources and tools. 12 | 13 | This repository is an example client and server that allows an AI assistant like Claude to interact with CloudWatch logs in an AWS account. To learn more about MCP, read through the [introduction](https://modelcontextprotocol.io/introduction). 14 | 15 | ## ✨ Features 16 | 17 | - Browse and search CloudWatch Log Groups 18 | - Search logs using CloudWatch Logs Insights query syntax 19 | - Generate log summaries and identify error patterns 20 | - Correlate logs across multiple AWS services 21 | - AI-optimized tools for assistants like Claude 22 | 23 | [Detailed feature list](./docs/features.md) 24 | 25 | ## 🚀 Installation 26 | 27 | ### Prerequisites 28 | 29 | - The [uv](https://github.com/astral-sh/uv) Python package and project manager 30 | - An AWS account with CloudWatch Logs 31 | - Configured [AWS credentials](./docs/aws-config.md) 32 | 33 | 34 | ### Setup 35 | 36 | ```bash 37 | # Clone the repository 38 | git clone https://github.com/awslabs/Log-Analyzer-with-MCP.git 39 | cd Log-Analyzer-with-MCP 40 | 41 | # Create a virtual environment and install dependencies 42 | uv sync 43 | source .venv/bin/activate # On Windows, use `.venv\Scripts\activate` 44 | ``` 45 | 46 | ## 🚦 Quick Start 47 | 48 | 1. Make sure to have configured your AWS credentials as [described here](./docs/aws-config.md) 49 | 50 | 2. Update your `claude_desktop_config.json` file with the proper configuration outlined in the [AI integration guide](./docs/ai-integration.md) 51 | 52 | 3. Open Claude for Desktop and start chatting! 53 | 54 | For more examples and advanced usage, see the [detailed usage guide](./docs/usage.md). 55 | 56 | ## 🤖 AI Integration 57 | 58 | This project can be easily integrated with AI assistants like Claude for Desktop. See the [AI integration guide](./docs/ai-integration.md) for details. 59 | 60 | ## 📚 Documentation 61 | 62 | - [Detailed Features](./docs/features.md) 63 | - [Usage Guide](./docs/usage.md) 64 | - [AWS Configuration](./docs/aws-config.md) 65 | - [Architecture Details](./docs/architecture.md) 66 | - [AI Integration](./docs/ai-integration.md) 67 | - [Troubleshooting](./docs/troubleshooting.md) 68 | 69 | ## 🔒 Security 70 | 71 | See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information. 72 | 73 | ## 📄 License 74 | 75 | This project is licensed under the Apache-2.0 License. ``` -------------------------------------------------------------------------------- /CODE_OF_CONDUCT.md: -------------------------------------------------------------------------------- ```markdown 1 | ## Code of Conduct 2 | This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). 3 | For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact 4 | [email protected] with any additional questions or comments. 5 | ``` -------------------------------------------------------------------------------- /CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown 1 | # Contributing Guidelines 2 | 3 | Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional 4 | documentation, we greatly value feedback and contributions from our community. 5 | 6 | Please read through this document before submitting any issues or pull requests to ensure we have all the necessary 7 | information to effectively respond to your bug report or contribution. 8 | 9 | 10 | ## Reporting Bugs/Feature Requests 11 | 12 | We welcome you to use the GitHub issue tracker to report bugs or suggest features. 13 | 14 | When filing an issue, please check existing open, or recently closed, issues to make sure somebody else hasn't already 15 | reported the issue. Please try to include as much information as you can. Details like these are incredibly useful: 16 | 17 | * A reproducible test case or series of steps 18 | * The version of our code being used 19 | * Any modifications you've made relevant to the bug 20 | * Anything unusual about your environment or deployment 21 | 22 | 23 | ## Contributing via Pull Requests 24 | Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that: 25 | 26 | 1. You are working against the latest source on the *main* branch. 27 | 2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already. 28 | 3. You open an issue to discuss any significant work - we would hate for your time to be wasted. 29 | 30 | To send us a pull request, please: 31 | 32 | 1. Fork the repository. 33 | 2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change. 34 | 3. Ensure local tests pass. 35 | 4. Commit to your fork using clear commit messages. 36 | 5. Send us a pull request, answering any default questions in the pull request interface. 37 | 6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation. 38 | 39 | GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and 40 | [creating a pull request](https://help.github.com/articles/creating-a-pull-request/). 41 | 42 | 43 | ## Finding contributions to work on 44 | Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels (enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any 'help wanted' issues is a great place to start. 45 | 46 | 47 | ## Code of Conduct 48 | This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). 49 | For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact 50 | [email protected] with any additional questions or comments. 51 | 52 | 53 | ## Security issue notifications 54 | If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue. 55 | 56 | 57 | ## Licensing 58 | 59 | See the [LICENSE](LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. 60 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/resources/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "Log-Analyzer-with-MCP" 3 | version = "0.1.0" 4 | description = "An MCP server that provides AI assistants access to CloudWatch Logs" 5 | readme = "README.md" 6 | requires-python = ">=3.12" 7 | authors = [ 8 | {name = "Aditya Addepalli", email = "[email protected]"}, 9 | ] 10 | dependencies = [ 11 | "boto3>=1.37.11", 12 | "mcp[cli]>=1.6.0", 13 | ] 14 | 15 | [dependency-groups] 16 | dev = [ 17 | "ruff>=0.11.10", 18 | ] 19 | ``` -------------------------------------------------------------------------------- /.github/workflows/pre-commit.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: pre-commit 2 | 3 | on: 4 | pull_request: 5 | push: 6 | branches: [main] 7 | permissions: {} 8 | jobs: 9 | pre-commmit: 10 | runs-on: ubuntu-latest 11 | steps: 12 | - uses: actions/checkout@v4 13 | - uses: actions/setup-python@v5 14 | with: 15 | python-version: "3.12" 16 | cache: "pip" 17 | - name: pre-commit setup 18 | run: pip install pre-commit 19 | - name: run linter 20 | run: pre-commit run --all-files 21 | ``` -------------------------------------------------------------------------------- /docs/features.md: -------------------------------------------------------------------------------- ```markdown 1 | # Detailed Features 2 | 3 | ## 🔍 Log Group Discovery 4 | - Browse available CloudWatch Log Groups 5 | - Filter log groups by name prefix 6 | - Paginate through large sets of log groups 7 | 8 | ## 🔎 Search Capabilities 9 | - Search for specific error patterns or anomalies in CloudWatch Logs 10 | - Apply CloudWatch Logs Insights query syntax for powerful filtering 11 | - Limit results to specific time ranges 12 | 13 | ## 📊 Log Analysis 14 | - Generate summaries of log activity over time 15 | - Identify common error patterns and exceptions 16 | - Analyze log structure and format automatically 17 | - View frequency of different error types 18 | 19 | ## 🔄 Cross-Service Correlation 20 | - Correlate logs across multiple AWS services 21 | - Track request flows through distributed systems 22 | - Identify related events across different log groups 23 | 24 | ## 🤖 AI-Optimized Tools 25 | - Direct tool access for AI assistants 26 | - Specialized prompts for guided log analysis 27 | - Structured data responses for easy consumption ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/tools/__init__.py: -------------------------------------------------------------------------------- ```python 1 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 | # SPDX-License-Identifier: Apache-2.0 3 | 4 | import functools 5 | import json 6 | import traceback 7 | from typing import Callable 8 | 9 | 10 | def handle_exceptions(func: Callable) -> Callable: 11 | """ 12 | Decorator for handling exceptions in tool methods. 13 | Ensures that all tool methods return a standardized error response 14 | rather than raising exceptions that would cause the client to fail. 15 | """ 16 | 17 | @functools.wraps(func) 18 | async def wrapper(*args, **kwargs): 19 | try: 20 | return await func(*args, **kwargs) 21 | except Exception as e: 22 | error_traceback = traceback.format_exc() 23 | error_response = { 24 | "status": "Error", 25 | "error": str(e), 26 | "error_type": e.__class__.__name__, 27 | "details": error_traceback.split("\n")[-2] if error_traceback else None, 28 | } 29 | return json.dumps(error_response, indent=2) 30 | 31 | return wrapper 32 | ``` -------------------------------------------------------------------------------- /.github/pull_request_template.md: -------------------------------------------------------------------------------- ```markdown 1 | <!-- markdownlint-disable MD041 MD043 --> 2 | **Issue number:** 3 | 4 | ## Summary 5 | 6 | ### Changes 7 | 8 | > Please provide a summary of what's being changed 9 | 10 | ### User experience 11 | 12 | > Please share what the user experience looks like before and after this change 13 | 14 | ## Checklist 15 | 16 | If your change doesn't seem to apply, please leave them unchecked. 17 | 18 | * [ ] I have reviewed the [contributing guidelines](https://github.com/awslabs/Log-Analyzer-with-MCP/blob/main/CONTRIBUTING.md) 19 | * [ ] I have performed a self-review of this change 20 | * [ ] Changes have been tested 21 | * [ ] Changes are documented 22 | 23 | <details> 24 | <summary>Is this a breaking change?</summary> 25 | 26 | **RFC issue number**: 27 | 28 | Checklist: 29 | 30 | * [ ] Migration process documented 31 | * [ ] Implement warnings (if it can live side by side) 32 | 33 | </details> 34 | 35 | ## Acknowledgment 36 | 37 | By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of the [project license](https://github.com/awslabs/Log-Analyzer-with-MCP/blob/main/LICENSE). 38 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/tools/utils.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | from datetime import datetime, timedelta 7 | import dateutil.parser 8 | 9 | 10 | def get_time_range(hours: int, start_time: str = None, end_time: str = None): 11 | """ 12 | Calculate time range timestamps from hours or exact start/end times. 13 | 14 | Args: 15 | hours: Number of hours to look back (used if start_time is not provided) 16 | start_time: Optional ISO8601 start time 17 | end_time: Optional ISO8601 end time 18 | 19 | Returns: 20 | Tuple of (start_timestamp, end_timestamp) in milliseconds since epoch 21 | """ 22 | if start_time: 23 | start_ts = int(dateutil.parser.isoparse(start_time).timestamp() * 1000) 24 | else: 25 | start_ts = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) 26 | 27 | if end_time: 28 | end_ts = int(dateutil.parser.isoparse(end_time).timestamp() * 1000) 29 | else: 30 | end_ts = int(datetime.now().timestamp() * 1000) 31 | 32 | return start_ts, end_ts 33 | ``` -------------------------------------------------------------------------------- /docs/architecture.md: -------------------------------------------------------------------------------- ```markdown 1 | # 🏗️ Architecture Details 2 | 3 | The following diagram illustrates the high-level architecture of the MCP CloudWatch Log Analyzer: 4 | 5 |  6 | 7 | The architecture consists of three main components: 8 | 9 | ## 💻 Client Side 10 | - AWS Credentials are configured on the client machine 11 | - The local computer runs the MCP client applications 12 | - MCP Server runs locally and manages the communication 13 | 14 | ## ☁️ AWS Cloud 15 | - CloudWatch service provides the log data and search capabilities 16 | 17 | ## 🔄 Data Flow 18 | - AWS Credentials flow from configuration to the client 19 | - The client communicates with CloudWatch through the MCP Server 20 | - The MCP Server mediates all interactions with AWS services 21 | 22 | The project follows the Model Context Protocol architecture: 23 | 24 | ## 📚 Resources 25 | Expose CloudWatch log groups, streams, and events as addressable URIs 26 | - `logs://groups` - List all log groups 27 | - `logs://groups/{log_group_name}/streams` - List streams for a specific group 28 | - `logs://groups/{log_group_name}/streams/{log_stream_name}` - Get events from a stream 29 | 30 | ## 🧰 Tools 31 | Provide functionality for log analysis, search, and correlation 32 | - `list_log_groups` - List and filter available log groups 33 | - `search_logs` - Search logs with CloudWatch Insights queries 34 | - `summarize_log_activity` - Generate time-based activity summaries 35 | - `find_error_patterns` - Identify common error patterns 36 | - `correlate_logs` - Find related events across multiple log groups 37 | 38 | ## 💬 Prompts 39 | Guide AI assistants through common workflows 40 | - `list_cloudwatch_log_groups` - Help explore available log groups 41 | - `analyze_cloudwatch_logs` - Guide log analysis process 42 | 43 | ## 🖥️ Server 44 | Handles MCP protocol communication and AWS API integration 45 | - Manages API access to CloudWatch Logs 46 | - Handles asynchronous CloudWatch Logs Insights queries 47 | - Provides structured data responses 48 | 49 | ## 📱 Client 50 | Command-line interface for interacting with the server 51 | - Parses commands and arguments 52 | - Connects to the server via stdio 53 | - Formats JSON responses for human readability ``` -------------------------------------------------------------------------------- /docs/aws-config.md: -------------------------------------------------------------------------------- ```markdown 1 | # 🔐 AWS Configuration Guide 2 | 3 | For the MCP server to access your AWS CloudWatch Logs, you need to configure AWS credentials, which you can learn how to do [here](https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-files.html). The server uses boto3's credential resolution chain, which checks several locations in the following order: 4 | 5 | 1. **Environment variables**: 6 | ```bash 7 | export AWS_ACCESS_KEY_ID="your-access-key" 8 | export AWS_SECRET_ACCESS_KEY="your-secret-key" 9 | export AWS_REGION="us-east-1" 10 | ``` 11 | 12 | 2. **Shared credential file** (`~/.aws/credentials`): 13 | ```ini 14 | [default] 15 | aws_access_key_id = your-access-key 16 | aws_secret_access_key = your-secret-key 17 | ``` 18 | 19 | If you're seeing errors like `An error occurred (AccessDenied) when calling the DescribeLogGroups operation: Access denied`, make sure to add your credentials in this format: 20 | ```ini 21 | [default] 22 | aws_access_key_id = your-access-key 23 | aws_secret_access_key = your-secret-key 24 | 25 | # For temporary credentials, add the session token 26 | [temp-profile] 27 | aws_access_key_id = your-temp-access-key 28 | aws_secret_access_key = your-temp-secret-key 29 | aws_session_token = your-session-token 30 | ``` 31 | 32 | Check out the [troubleshooting guide](./troubleshooting.md) for more information. 33 | 34 | 3. **AWS config file** (`~/.aws/config`): 35 | ```ini 36 | [default] 37 | region = us-east-1 38 | ``` 39 | 40 | You can set up your AWS credentials using the AWS CLI: 41 | 42 | ```bash 43 | aws configure 44 | ``` 45 | 46 | ## Using a Specific AWS Profile or Region 47 | 48 | 1. **Server Start-up** 49 | 50 | If you have multiple AWS profiles or want to specify a region, use: 51 | 52 | ```bash 53 | python src/cw-mcp-server/server.py --profile your-profile-name --region us-west-2 54 | ``` 55 | 56 | 2. **Per-Call Override** 57 | 58 | Override the profile or region on individual AI prompts or tool calls: 59 | 60 | > Example: Get a list of CloudWatch log groups using the "dev-account" profile in "eu-central-1" region. 61 | 62 | Once you set a profile or region, the LLM keeps using it for follow-ups. Only specify a new profile or region when you need to switch accounts or regions. 63 | 64 | This is useful when you need to access CloudWatch logs in different AWS accounts or regions. 65 | 66 | ## 🛡️ Required Permissions 67 | 68 | The MCP server requires permissions to access CloudWatch Logs. At minimum, ensure your IAM user or role has the following policies: 69 | - `CloudWatchLogsReadOnlyAccess` 70 | ``` -------------------------------------------------------------------------------- /docs/troubleshooting.md: -------------------------------------------------------------------------------- ```markdown 1 | # 🔧 Troubleshooting Guide 2 | 3 | There may be various issues you can run into while setting this up. Here are some tips on troubleshooting: 4 | 5 | ## ⚠️ Common Issues 6 | 7 | **Server Disconnected**: 8 | ``` 9 | MCP cw-mcp-server: Server Disconnected. 10 | ``` 11 | 1. Ensure your json file in the [ai integration guide](./ai-integration.md) is configured properly. 12 | 2. Ensure you've set up your AWS credentials properly according to the [aws configuration](./aws-config.md) 13 | 14 | **Authentication Errors**: 15 | ``` 16 | Error: An error occurred (AccessDenied) when calling the DescribeLogGroups operation: Access denied 17 | ``` 18 | Ensure your AWS credentials are properly configured and have the necessary permissions to access CloudWatch Logs: 19 | 20 | 1. Check if your credentials file exists: 21 | ```bash 22 | cat ~/.aws/credentials 23 | ``` 24 | 25 | 2. Verify you have the required permissions (CloudWatchLogsReadOnlyAccess) for the assumed role you are using. 26 | 27 | 3. If using temporary credentials, ensure your session token is included in your `~/.aws/credentials` file: 28 | ```ini 29 | [profile-name] 30 | aws_access_key_id = your-temp-access-key 31 | aws_secret_access_key = your-temp-secret-key 32 | aws_session_token = your-session-token 33 | ``` 34 | 35 | 4. Test your credentials directly with AWS CLI: 36 | ```bash 37 | aws cloudwatch list-metrics --namespace AWS/Logs 38 | ``` 39 | 40 | **Resource Not Found**: 41 | ``` 42 | Error: An error occurred (ResourceNotFoundException) when calling the GetLogEvents operation 43 | ``` 44 | Check that the log group and stream names are correct. Log stream names are case sensitive. 45 | 46 | **Connection Issues**: 47 | ``` 48 | Error: Failed to connect to MCP server 49 | ``` 50 | Verify that the server is running and accessible. Check file paths in your `claude_desktop_config.json` or client configuration. 51 | 52 | **Query Timeout**: 53 | ``` 54 | "status": "Timeout", "error": "Search query failed to complete" 55 | ``` 56 | For complex queries or large log volumes, try reducing the time range using the `--hours` parameter. 57 | 58 | **Claude terminating request**: 59 | 60 | This could be due to a query timeout or invalid response from CloudWatch (for example: performing operations on a non-existent log group). 61 | 62 | In this case, try checking the server logs as outlined in the settings 63 | 64 |  65 | 66 | then, click on the `Open Logs Folder` and open the `mcp-server-cw-mcp-server.log` file to see more details. 67 | 68 | **Amazon Q CLI terminating request**: 69 | 70 | if you have issues with your configuration, then Amazon Q CLI will start (without any MCP Server Tools) with an error similar to: 71 | ``` 72 | WARNING: Error reading global mcp config: expected value at line 9 column 19 73 | Please check to make sure config is correct. Discarding. 74 | ``` 75 | You might also see timeout issues if it is struggling to find and download the details you have configured in your mcp.json, for example: 76 | ``` 77 | x mcp_server has failed to load: 78 | - Operation timed out: recv for initialization 79 | - run with Q_LOG_LEVEL=trace and see $TMPDIR/qlog for detail 80 | x 0 of 1 mcp servers initialized 81 | ``` 82 | You can go to `$TMPDIR/qlog` to find various logs generated, and you can configure Q_LOG_LEVEL with either trace, debug, info, and warn configurations to get debug output to help you troubleshoot any issues you might run into. 83 | 84 | 85 | ## 🆘 Getting Help 86 | 87 | If you encounter issues not covered in this troubleshooting section, please: 88 | 89 | 1. Check the server logs for detailed error messages 90 | 2. Verify your AWS permissions and configuration 91 | 92 | If you're still facing issues, please open a GitHub Issue. 93 | ``` -------------------------------------------------------------------------------- /docs/ai-integration.md: -------------------------------------------------------------------------------- ```markdown 1 | # AI Integration Guide 2 | 3 | ## 🖥️ Claude Desktop Integration 4 | 5 | You can add the configuration for the MCP server in Claude for Desktop for AI-assisted log analysis. 6 | 7 | To get Claude for Desktop and how to add an MCP server, access [this link](https://modelcontextprotocol.io/quickstart/user). Add this to your respective json file: 8 | 9 | ```json 10 | { 11 | "mcpServers": { 12 | "cw-mcp-server": { 13 | "command": "uv", 14 | "args": [ 15 | "--directory", 16 | "/path/to/Log-Analyzer-with-MCP/src/cw-mcp-server", 17 | "run", 18 | "server.py" 19 | // You can add "--profile", "your-profile" and/or "--region", "us-west-2" here if needed but it will pull it from your AWS credentials as well 20 | ] 21 | } 22 | }, 23 | } 24 | ``` 25 | 26 | ## 🤖 Amazon Q CLI Integration 27 | 28 | Amazon Q CLI acts as an MCP Client. To connect to MCP Servers and access the tools they surface, you need to create a configuration file called `mcp.json` in your Amazon Q configuration directory. 29 | 30 | Your directory structure should look like this: 31 | 32 | ```bash 33 | ~/.aws 34 | └── amazonq 35 | ├── mcp.json 36 | ├── profiles 37 | ├── cache 38 | ├── history 39 | └── prompts 40 | ``` 41 | 42 | If `mcp.json` is empty, edit it to add this to your MCP Server configuration file: 43 | 44 | ```json 45 | { 46 | "mcpServers": { 47 | "cw-mcp-server": { 48 | "command": "uv", 49 | "args": [ 50 | "--directory", 51 | "/path/to/Log-Analyzer-with-MCP/src/cw-mcp-server", 52 | "run", 53 | "server.py" 54 | // Optionally add "--profile", "your-profile" and/or "--region", "us-west-2" here if needed but it will pull it from your AWS credentials as well 55 | ] 56 | } 57 | } 58 | } 59 | ``` 60 | 61 | ### Testing the configuration 62 | Every time you start Amazon Q CLI, it will attempt to load any configured MCP Servers. You should see output indicating that the MCP Server has been discovered and initialized. 63 | 64 |  65 | 66 | If you're running into issues, check out the [troubleshooting guide](./troubleshooting.md) or open a GitHub Issue. 67 | 68 | ## 🔍 AI Assistant Capabilities 69 | 70 | With the enhanced tool support, AI assistants can now: 71 | 72 | 1. **Discover Log Groups**: 73 | - "Show me all my CloudWatch log groups" 74 | - "List log groups that start with /aws/lambda" 75 | - "Show me the next page of log groups" 76 | 77 | 2. **Understand Log Structure**: 78 | - "Analyze the structure of my API Gateway logs" 79 | - "What fields are common in these JSON logs?" 80 | - "Show me a sample of recent logs from this group" 81 | 82 | 3. **Diagnose Issues**: 83 | - "Find all errors in my Lambda logs from the past 24 hours" 84 | - "What's the most common error pattern in this log group?" 85 | - "Show me logs around the time this service crashed" 86 | 87 | 4. **Perform Analysis**: 88 | - "Compare log volumes between these three services" 89 | - "Find correlations between errors in my database and API logs" 90 | - "Analyze the trend of timeouts in my Lambda function" 91 | 92 | > You can specify a different AWS profile or region in your prompt, e.g. "Show me all my CloudWatch log groups using <profile_name> profile in <region> region" 93 | 94 | ## 💬 AI Prompt Templates 95 | 96 | The server provides specialized prompts that AI assistants can use: 97 | 98 | 1. **List and Explore Log Groups Prompt**: 99 | ``` 100 | I'll help you explore the CloudWatch log groups in your AWS environment. 101 | First, I'll list the available log groups... 102 | ``` 103 | 104 | 2. **Log Analysis Prompt**: 105 | ``` 106 | Please analyze the following CloudWatch logs from the {log_group_name} log group. 107 | First, I'll get you some information about the log group... 108 | ``` 109 | 3. **Profile/Region Override**: 110 | ``` 111 | I'll help you list CloudWatch log groups using the <profile_name> profile in the <region> region. Let me do that for you: 112 | ``` 113 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/tools/correlation_tools.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | import asyncio 7 | import boto3 8 | import json 9 | import time 10 | from datetime import datetime 11 | from typing import List 12 | 13 | from . import handle_exceptions 14 | from .utils import get_time_range 15 | 16 | 17 | class CloudWatchLogsCorrelationTools: 18 | """Tools for correlating logs across multiple CloudWatch Log groups.""" 19 | 20 | def __init__(self, profile_name=None, region_name=None): 21 | """Initialize the CloudWatch Logs client. 22 | 23 | Args: 24 | profile_name: Optional AWS profile name to use for credentials 25 | region_name: Optional AWS region name to use for API calls 26 | """ 27 | # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain 28 | self.profile_name = profile_name 29 | self.region_name = region_name 30 | session = boto3.Session(profile_name=profile_name, region_name=region_name) 31 | self.logs_client = session.client("logs") 32 | 33 | @handle_exceptions 34 | async def correlate_logs( 35 | self, 36 | log_group_names: List[str], 37 | search_term: str, 38 | hours: int = 24, 39 | start_time: str = None, 40 | end_time: str = None, 41 | ) -> str: 42 | """ 43 | Correlate logs across multiple AWS services using a common search term. 44 | 45 | Args: 46 | log_group_names: List of log group names to search 47 | search_term: Term to search for in logs (request ID, transaction ID, etc.) 48 | hours: Number of hours to look back 49 | start_time: Start time in ISO8601 format 50 | end_time: End time in ISO8601 format 51 | 52 | Returns: 53 | JSON string with correlated events 54 | """ 55 | start_ts, end_ts = get_time_range(hours, start_time, end_time) 56 | 57 | # Validate inputs 58 | if not log_group_names: 59 | return json.dumps( 60 | {"status": "Error", "error": "No log groups specified"}, indent=2 61 | ) 62 | 63 | if not search_term: 64 | return json.dumps( 65 | {"status": "Error", "error": "No search term specified"}, indent=2 66 | ) 67 | 68 | # Results dictionary 69 | results = { 70 | "timeRange": { 71 | "start": datetime.fromtimestamp(start_ts / 1000).isoformat(), 72 | "end": datetime.fromtimestamp(end_ts / 1000).isoformat(), 73 | "hours": hours, 74 | }, 75 | "searchTerm": search_term, 76 | "logGroups": {}, 77 | "correlatedEvents": [], 78 | } 79 | 80 | # Get relevant logs from each group 81 | for log_group_name in log_group_names: 82 | # Use CloudWatch Logs Insights query 83 | query = f""" 84 | filter @message like "{search_term}" 85 | | sort @timestamp asc 86 | | limit 100 87 | """ 88 | 89 | # Start the query 90 | query_start_time = time.time() 91 | start_query_response = self.logs_client.start_query( 92 | logGroupName=log_group_name, 93 | startTime=start_ts, 94 | endTime=end_ts, 95 | queryString=query, 96 | ) 97 | 98 | query_id = start_query_response["queryId"] 99 | 100 | # Poll for query results 101 | response = None 102 | while response is None or response["status"] == "Running": 103 | await asyncio.sleep(1) # Wait before checking again 104 | response = self.logs_client.get_query_results(queryId=query_id) 105 | 106 | # Avoid long-running queries 107 | if response["status"] == "Running": 108 | # Check if we've been running too long (60 seconds) 109 | if time.time() - query_start_time > 60: 110 | response = {"status": "Timeout", "results": []} 111 | break 112 | 113 | # Process results for this log group 114 | log_group_events = [] 115 | 116 | for result in response.get("results", []): 117 | event = {"logGroup": log_group_name, "timestamp": None, "message": None} 118 | 119 | for field in result: 120 | if field["field"] == "@timestamp": 121 | event["timestamp"] = field["value"] 122 | elif field["field"] == "@message": 123 | event["message"] = field["value"] 124 | elif field["field"] == "@logStream": 125 | event["logStream"] = field["value"] 126 | 127 | if event["timestamp"] and event["message"]: 128 | log_group_events.append(event) 129 | results["correlatedEvents"].append(event) 130 | 131 | # Store events for this log group 132 | results["logGroups"][log_group_name] = { 133 | "eventCount": len(log_group_events), 134 | "events": log_group_events, 135 | } 136 | 137 | # Sort all correlated events by timestamp 138 | results["correlatedEvents"] = sorted( 139 | results["correlatedEvents"], key=lambda x: x.get("timestamp", "") 140 | ) 141 | 142 | return json.dumps(results, indent=2) 143 | ``` -------------------------------------------------------------------------------- /docs/usage.md: -------------------------------------------------------------------------------- ```markdown 1 | # Detailed Usage Guide 2 | 3 | ## 🌐 Integrated with MCP clients (Claude for Desktop, Cursor, Windsurf, etc.) - recommended way for usage 4 | 5 | AI assistants can leverage this MCP server. To understand more check out the [AI Integration Guide](./ai-integration.md) 6 | 7 | ## 🖥️ Standalone Server directly 8 | 9 | The MCP server exposes CloudWatch logs data and analysis tools to AI assistants and MCP clients: 10 | 11 | ```bash 12 | python src/cw-mcp-server/server.py [--profile your-profile] [--region us-west-2] 13 | ``` 14 | 15 | The server runs in the foreground by default. To run it in the background, you can use: 16 | 17 | ```bash 18 | python src/cw-mcp-server/server.py & 19 | ``` 20 | 21 | [Amazon Bedrock AgentCore requires stateless streamable-HTTP servers](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-mcp.html#runtime-mcp-how-it-works) because the Runtime provides session isolation by default. The platform automatically adds a `Mcp-Session-Id` header for any request without it, so MCP clients can maintain connection continuity to the same Amazon Bedrock AgentCore Runtime session. 22 | 23 | The server runs in stateful mode by default. To run it in stateless mode, you can use: 24 | 25 | ```bash 26 | python src/cw-mcp-server/server.py [--profile your-profile] [--region us-west-2] --stateless 27 | ``` 28 | 29 | ## 📟 CLI Client (one off usage) 30 | 31 | The project includes a command-line client for interacting with the MCP server: 32 | 33 | ```bash 34 | # List available log groups 35 | python src/client.py list-groups [--profile your-profile] [--region us-west-2] 36 | 37 | # List log groups with a prefix filter 38 | python src/client.py list-groups --prefix "/aws/lambda" [--region us-west-2] 39 | 40 | # Use the tool interface instead of resource 41 | python src/client.py list-groups --use-tool [--region us-west-2] 42 | 43 | # Get a prompt for exploring log groups 44 | python src/client.py list-prompt [--region us-west-2] 45 | 46 | # List log streams in a specific log group 47 | python src/client.py list-streams "/aws/lambda/my-function" [--region us-west-2] 48 | 49 | # Get log events from a specific stream 50 | python src/client.py get-events "/aws/lambda/my-function" "2023/06/01/[$LATEST]abcdef123456" [--region us-west-2] 51 | 52 | # Get a sample of recent logs 53 | python src/client.py sample "/aws/lambda/my-function" [--region us-west-2] 54 | 55 | # Get recent errors 56 | python src/client.py recent-errors "/aws/lambda/my-function" [--region us-west-2] 57 | 58 | # Get log structure analysis 59 | python src/client.py structure "/aws/lambda/my-function" [--region us-west-2] 60 | 61 | # Search logs for a specific pattern 62 | python src/client.py search "/aws/lambda/my-function" "filter @message like 'error'" [--region us-west-2] 63 | 64 | # Generate a summary of log activity 65 | python src/client.py summarize "/aws/lambda/my-function" --hours 48 [--region us-west-2] 66 | 67 | # Find common error patterns 68 | python src/client.py find-errors "/aws/lambda/my-function" [--region us-west-2] 69 | 70 | # Correlate logs across multiple services 71 | python src/client.py correlate "/aws/lambda/service1" "/aws/lambda/service2" "OrderId: 12345" [--region us-west-2] 72 | ``` 73 | 74 | *You can use --profile and --region with any command to target a specific AWS account or region.* 75 | 76 | ## 🧩 Example Workflows 77 | 78 | ### Finding and analyzing errors in a Lambda function using the standalone server directly 79 | 80 | ```bash 81 | # 1. List your log groups to find the Lambda function 82 | python src/client.py list-groups --prefix "/aws/lambda" [--region us-west-2] 83 | 84 | # 2. Generate a summary to see when errors occurred 85 | python src/client.py summarize "/aws/lambda/my-function" --hours 24 [--region us-west-2] 86 | 87 | # 3. Find the most common error patterns 88 | python src/client.py find-errors "/aws/lambda/my-function" [--region us-west-2] 89 | 90 | # 4. Search for details about a specific error 91 | python src/client.py search "/aws/lambda/my-function" "filter @message like 'ConnectionError'" [--region us-west-2] 92 | ``` 93 | 94 | ### Correlating requests across microservices using the standalone server directly 95 | 96 | ```bash 97 | # Track a request ID across multiple services 98 | python src/client.py correlate \ 99 | "/aws/lambda/api-gateway" \ 100 | "/aws/lambda/auth-service" \ 101 | "/aws/lambda/payment-processor" \ 102 | "req-abc123" [--region us-west-2] 103 | ``` 104 | 105 | ## 🔗 Resource URIs 106 | 107 | The MCP server exposes CloudWatch Logs data through the following resource URIs: 108 | 109 | | Resource URI | Description | 110 | |--------------|-------------| 111 | | `logs://groups` | List all log groups | 112 | | `logs://groups/filter/{prefix}` | List log groups filtered by prefix | 113 | | `logs://groups/{log_group_name}` | Get details about a specific log group | 114 | | `logs://groups/{log_group_name}/streams` | List streams for a log group | 115 | | `logs://groups/{log_group_name}/streams/{log_stream_name}` | Get events from a specific log stream | 116 | | `logs://groups/{log_group_name}/sample` | Get a sample of recent logs | 117 | | `logs://groups/{log_group_name}/recent-errors` | Get recent errors from a log group | 118 | | `logs://groups/{log_group_name}/metrics` | Get log metrics (volume, frequency) | 119 | | `logs://groups/{log_group_name}/structure` | Analyze log format and structure | 120 | 121 | ## 🧰 Tool Handlers 122 | 123 | The server provides the following tool handlers for AI assistants: 124 | 125 | | Tool | Description | 126 | |------|-------------| 127 | | `list_log_groups` | List available CloudWatch log groups with filtering options | 128 | | `search_logs` | Execute CloudWatch Logs Insights queries on a single log group | 129 | | `search_logs_multi` | Execute CloudWatch Logs Insights queries across multiple log groups | 130 | | `filter_log_events` | Filter logs by pattern across all streams | 131 | | `summarize_log_activity` | Generate time-based activity summaries | 132 | | `find_error_patterns` | Identify common error patterns | 133 | | `correlate_logs` | Find related events across multiple log groups | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/tools/search_tools.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | import asyncio 7 | import boto3 8 | import json 9 | import time 10 | from datetime import datetime 11 | from typing import List 12 | 13 | from . import handle_exceptions 14 | from .utils import get_time_range 15 | 16 | 17 | class CloudWatchLogsSearchTools: 18 | """Tools for searching and querying CloudWatch Logs.""" 19 | 20 | def __init__(self, profile_name=None, region_name=None): 21 | """Initialize the CloudWatch Logs client. 22 | 23 | Args: 24 | profile_name: Optional AWS profile name to use for credentials 25 | region_name: Optional AWS region name to use for API calls 26 | """ 27 | # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain 28 | self.profile_name = profile_name 29 | self.region_name = region_name 30 | session = boto3.Session(profile_name=profile_name, region_name=region_name) 31 | self.logs_client = session.client("logs") 32 | 33 | @handle_exceptions 34 | async def search_logs( 35 | self, 36 | log_group_name: str, 37 | query: str, 38 | hours: int = 24, 39 | start_time: str = None, 40 | end_time: str = None, 41 | ) -> str: 42 | """ 43 | Search logs using CloudWatch Logs Insights query. 44 | 45 | Args: 46 | log_group_name: The log group to search 47 | query: CloudWatch Logs Insights query syntax 48 | hours: Number of hours to look back 49 | start_time: Start time in ISO8601 format 50 | end_time: End time in ISO8601 format 51 | 52 | Returns: 53 | JSON string with search results 54 | """ 55 | return await self.search_logs_multi( 56 | [log_group_name], query, hours, start_time, end_time 57 | ) 58 | 59 | @handle_exceptions 60 | async def search_logs_multi( 61 | self, 62 | log_group_names: List[str], 63 | query: str, 64 | hours: int = 24, 65 | start_time: str = None, 66 | end_time: str = None, 67 | ) -> str: 68 | """ 69 | Search logs across multiple log groups using CloudWatch Logs Insights query. 70 | 71 | Args: 72 | log_group_names: List of log groups to search 73 | query: CloudWatch Logs Insights query syntax 74 | hours: Number of hours to look back 75 | start_time: Start time in ISO8601 format 76 | end_time: End time in ISO8601 format 77 | 78 | Returns: 79 | JSON string with search results 80 | """ 81 | start_ts, end_ts = get_time_range(hours, start_time, end_time) 82 | # Start the query 83 | query_start_time = time.time() 84 | start_query_response = self.logs_client.start_query( 85 | logGroupNames=log_group_names, 86 | startTime=start_ts, 87 | endTime=end_ts, 88 | queryString=query, 89 | limit=100, 90 | ) 91 | query_id = start_query_response["queryId"] 92 | 93 | # Poll for query results 94 | response = None 95 | while response is None or response["status"] == "Running": 96 | await asyncio.sleep(1) # Wait before checking again 97 | response = self.logs_client.get_query_results(queryId=query_id) 98 | elapsed_time = time.time() - query_start_time 99 | 100 | # Avoid long-running queries 101 | if response["status"] == "Running": 102 | # Check if we've been running too long (60 seconds) 103 | if elapsed_time > 60: 104 | return json.dumps( 105 | { 106 | "status": "Timeout", 107 | "error": "Search query failed to complete within time limit", 108 | }, 109 | indent=2, 110 | ) 111 | 112 | # Process and format the results 113 | formatted_results = { 114 | "status": response["status"], 115 | "statistics": response.get("statistics", {}), 116 | "searchedLogGroups": log_group_names, 117 | "results": [], 118 | } 119 | 120 | for result in response.get("results", []): 121 | result_dict = {} 122 | for field in result: 123 | result_dict[field["field"]] = field["value"] 124 | formatted_results["results"].append(result_dict) 125 | 126 | return json.dumps(formatted_results, indent=2) 127 | 128 | @handle_exceptions 129 | async def filter_log_events( 130 | self, 131 | log_group_name: str, 132 | filter_pattern: str, 133 | hours: int = 24, 134 | start_time: str = None, 135 | end_time: str = None, 136 | ) -> str: 137 | """ 138 | Filter log events by pattern across all streams in a log group. 139 | 140 | Args: 141 | log_group_name: The log group to filter 142 | filter_pattern: The pattern to search for (CloudWatch Logs filter syntax) 143 | hours: Number of hours to look back 144 | start_time: Start time in ISO8601 format 145 | end_time: End time in ISO8601 format 146 | 147 | Returns: 148 | JSON string with filtered events 149 | """ 150 | start_ts, end_ts = get_time_range(hours, start_time, end_time) 151 | response = self.logs_client.filter_log_events( 152 | logGroupName=log_group_name, 153 | filterPattern=filter_pattern, 154 | startTime=start_ts, 155 | endTime=end_ts, 156 | limit=100, 157 | ) 158 | 159 | events = response.get("events", []) 160 | formatted_events = [] 161 | 162 | for event in events: 163 | formatted_events.append( 164 | { 165 | "timestamp": datetime.fromtimestamp( 166 | event.get("timestamp", 0) / 1000 167 | ).isoformat(), 168 | "message": event.get("message"), 169 | "logStreamName": event.get("logStreamName"), 170 | } 171 | ) 172 | 173 | return json.dumps(formatted_events, indent=2) 174 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/tools/analysis_tools.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | import asyncio 7 | import boto3 8 | import json 9 | from datetime import datetime 10 | 11 | from . import handle_exceptions 12 | from .utils import get_time_range 13 | 14 | 15 | class CloudWatchLogsAnalysisTools: 16 | """Tools for analyzing CloudWatch Logs data.""" 17 | 18 | def __init__(self, profile_name=None, region_name=None): 19 | """Initialize the CloudWatch Logs client. 20 | 21 | Args: 22 | profile_name: Optional AWS profile name to use for credentials 23 | region_name: Optional AWS region name to use for API calls 24 | """ 25 | # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain 26 | self.profile_name = profile_name 27 | self.region_name = region_name 28 | session = boto3.Session(profile_name=profile_name, region_name=region_name) 29 | self.logs_client = session.client("logs") 30 | 31 | @handle_exceptions 32 | async def summarize_log_activity( 33 | self, 34 | log_group_name: str, 35 | hours: int = 24, 36 | start_time: str = None, 37 | end_time: str = None, 38 | ) -> str: 39 | """ 40 | Generate a summary of log activity over a specified time period. 41 | 42 | Args: 43 | log_group_name: The log group to analyze 44 | hours: Number of hours to look back 45 | start_time: Start time in ISO8601 format 46 | end_time: End time in ISO8601 format 47 | 48 | Returns: 49 | JSON string with activity summary 50 | """ 51 | start_ts, end_ts = get_time_range(hours, start_time, end_time) 52 | 53 | # Use CloudWatch Logs Insights to get a summary 54 | query = """ 55 | stats count(*) as logEvents, 56 | count_distinct(stream) as streams 57 | | sort @timestamp desc 58 | | limit 1000 59 | """ 60 | 61 | # Start the query 62 | start_query_response = self.logs_client.start_query( 63 | logGroupName=log_group_name, 64 | startTime=start_ts, 65 | endTime=end_ts, 66 | queryString=query, 67 | ) 68 | 69 | query_id = start_query_response["queryId"] 70 | 71 | # Poll for query results 72 | response = None 73 | while response is None or response["status"] == "Running": 74 | await asyncio.sleep(1) # Wait before checking again 75 | response = self.logs_client.get_query_results(queryId=query_id) 76 | 77 | # Get the hourly distribution 78 | hourly_query = """ 79 | stats count(*) as count by bin(1h) 80 | | sort @timestamp desc 81 | | limit 24 82 | """ 83 | 84 | # Start the hourly query 85 | hourly_query_response = self.logs_client.start_query( 86 | logGroupName=log_group_name, 87 | startTime=start_ts, 88 | endTime=end_ts, 89 | queryString=hourly_query, 90 | ) 91 | 92 | hourly_query_id = hourly_query_response["queryId"] 93 | 94 | # Poll for hourly query results 95 | hourly_response = None 96 | while hourly_response is None or hourly_response["status"] == "Running": 97 | await asyncio.sleep(1) # Wait before checking again 98 | hourly_response = self.logs_client.get_query_results( 99 | queryId=hourly_query_id 100 | ) 101 | 102 | # Process the main summary results 103 | summary = { 104 | "timeRange": { 105 | "start": datetime.fromtimestamp(start_ts / 1000).isoformat(), 106 | "end": datetime.fromtimestamp(end_ts / 1000).isoformat(), 107 | "hours": hours, 108 | }, 109 | "logEvents": 0, 110 | "uniqueStreams": 0, 111 | "hourlyDistribution": [], 112 | } 113 | 114 | # Extract the main stats 115 | for result in response.get("results", []): 116 | for field in result: 117 | if field["field"] == "logEvents": 118 | summary["logEvents"] = int(field["value"]) 119 | elif field["field"] == "streams": 120 | summary["uniqueStreams"] = int(field["value"]) 121 | 122 | # Extract the hourly distribution 123 | for result in hourly_response.get("results", []): 124 | hour_data = {} 125 | for field in result: 126 | if field["field"] == "bin(1h)": 127 | hour_data["hour"] = field["value"] 128 | elif field["field"] == "count": 129 | hour_data["count"] = int(field["value"]) 130 | 131 | if hour_data: 132 | summary["hourlyDistribution"].append(hour_data) 133 | 134 | return json.dumps(summary, indent=2) 135 | 136 | @handle_exceptions 137 | async def find_error_patterns( 138 | self, 139 | log_group_name: str, 140 | hours: int = 24, 141 | start_time: str = None, 142 | end_time: str = None, 143 | ) -> str: 144 | """ 145 | Find common error patterns in logs. 146 | 147 | Args: 148 | log_group_name: The log group to analyze 149 | hours: Number of hours to look back 150 | start_time: Start time in ISO8601 format 151 | end_time: End time in ISO8601 format 152 | 153 | Returns: 154 | JSON string with error patterns 155 | """ 156 | start_ts, end_ts = get_time_range(hours, start_time, end_time) 157 | 158 | # Query for error logs 159 | error_query = """ 160 | filter @message like /(?i)(error|exception|fail|traceback)/ 161 | | stats count(*) as errorCount by @message 162 | | sort errorCount desc 163 | | limit 20 164 | """ 165 | 166 | # Start the query 167 | start_query_response = self.logs_client.start_query( 168 | logGroupName=log_group_name, 169 | startTime=start_ts, 170 | endTime=end_ts, 171 | queryString=error_query, 172 | ) 173 | 174 | query_id = start_query_response["queryId"] 175 | 176 | # Poll for query results 177 | response = None 178 | while response is None or response["status"] == "Running": 179 | await asyncio.sleep(1) # Wait before checking again 180 | response = self.logs_client.get_query_results(queryId=query_id) 181 | 182 | # Process the results 183 | error_patterns = { 184 | "timeRange": { 185 | "start": datetime.fromtimestamp(start_ts / 1000).isoformat(), 186 | "end": datetime.fromtimestamp(end_ts / 1000).isoformat(), 187 | "hours": hours, 188 | }, 189 | "errorPatterns": [], 190 | } 191 | 192 | for result in response.get("results", []): 193 | pattern = {} 194 | for field in result: 195 | if field["field"] == "@message": 196 | pattern["message"] = field["value"] 197 | elif field["field"] == "errorCount": 198 | pattern["count"] = int(field["value"]) 199 | 200 | if pattern: 201 | error_patterns["errorPatterns"].append(pattern) 202 | 203 | return json.dumps(error_patterns, indent=2) 204 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/server.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | import sys 7 | import os 8 | import argparse 9 | from typing import List, Callable, Any, Type, Optional 10 | from functools import wraps 11 | import asyncio 12 | 13 | from mcp.server.fastmcp import FastMCP 14 | from resources.cloudwatch_logs_resource import CloudWatchLogsResource 15 | from tools.search_tools import CloudWatchLogsSearchTools 16 | from tools.analysis_tools import CloudWatchLogsAnalysisTools 17 | from tools.correlation_tools import CloudWatchLogsCorrelationTools 18 | 19 | # Parse command line arguments 20 | parser = argparse.ArgumentParser(description="CloudWatch Logs Analyzer MCP Server") 21 | parser.add_argument( 22 | "--profile", type=str, help="AWS profile name to use for credentials" 23 | ) 24 | parser.add_argument("--region", type=str, help="AWS region name to use for API calls") 25 | parser.add_argument( 26 | "--stateless", action="store_true", help="Stateless HTTP mode", default=False 27 | ) 28 | args, unknown = parser.parse_known_args() 29 | 30 | # Add the current directory to the path so we can import our modules 31 | current_dir = os.path.dirname(os.path.abspath(__file__)) 32 | sys.path.append(current_dir) 33 | 34 | # Create the MCP server for CloudWatch logs 35 | mcp = FastMCP("CloudWatch Logs Analyzer", stateless_http=args.stateless) 36 | 37 | # Initialize our resource and tools classes with the specified AWS profile and region 38 | cw_resource = CloudWatchLogsResource(profile_name=args.profile, region_name=args.region) 39 | search_tools = CloudWatchLogsSearchTools( 40 | profile_name=args.profile, region_name=args.region 41 | ) 42 | analysis_tools = CloudWatchLogsAnalysisTools( 43 | profile_name=args.profile, region_name=args.region 44 | ) 45 | correlation_tools = CloudWatchLogsCorrelationTools( 46 | profile_name=args.profile, region_name=args.region 47 | ) 48 | 49 | # Capture the parsed CLI profile and region in separate variables 50 | default_profile = args.profile 51 | default_region = args.region 52 | 53 | 54 | # Helper decorator to handle profile and region parameters for tools 55 | def with_aws_config(tool_class: Type, method_name: Optional[str] = None) -> Callable: 56 | """ 57 | Decorator that handles the profile and region parameters for tool functions. 58 | Creates a new instance of the specified tool class with the correct profile and region. 59 | 60 | Args: 61 | tool_class: The class to instantiate with the profile and region 62 | method_name: Optional method name if different from the decorated function 63 | """ 64 | 65 | def decorator(func: Callable) -> Callable: 66 | @wraps(func) 67 | async def wrapper(*args, **kwargs) -> Any: 68 | try: 69 | profile = kwargs.pop("profile", None) or default_profile 70 | region = kwargs.pop("region", None) or default_region 71 | tool_instance = tool_class(profile_name=profile, region_name=region) 72 | target_method = method_name or func.__name__ 73 | method = getattr(tool_instance, target_method) 74 | result = method(**kwargs) 75 | if asyncio.iscoroutine(result): 76 | return await result 77 | return result 78 | except AttributeError as e: 79 | raise RuntimeError( 80 | f"Method {target_method} not found in {tool_class.__name__}, {e}" 81 | ) from e 82 | except Exception as e: 83 | raise RuntimeError( 84 | f"An error {e} occurred while executing {target_method} in {tool_class.__name__}" 85 | ) from e 86 | 87 | return wrapper 88 | 89 | return decorator 90 | 91 | 92 | # ============================== 93 | # Resource Handlers 94 | # ============================== 95 | 96 | 97 | @mcp.resource("logs://groups") 98 | def get_log_groups() -> str: 99 | """Get a list of all CloudWatch Log Groups""" 100 | # Use default values for parameters 101 | prefix = None 102 | limit = 50 103 | next_token = None 104 | 105 | return cw_resource.get_log_groups(prefix, limit, next_token) 106 | 107 | 108 | @mcp.resource("logs://groups/filter/{prefix}") 109 | def get_filtered_log_groups(prefix: str) -> str: 110 | """ 111 | Get a filtered list of CloudWatch Log Groups by prefix 112 | 113 | Args: 114 | prefix: The prefix to filter log groups by 115 | """ 116 | # Default values for other parameters 117 | limit = 50 118 | next_token = None 119 | 120 | return cw_resource.get_log_groups(prefix, limit, next_token) 121 | 122 | 123 | @mcp.resource("logs://groups/{log_group_name}") 124 | def get_log_group_details(log_group_name: str) -> str: 125 | """Get detailed information about a specific log group""" 126 | return cw_resource.get_log_group_details(log_group_name) 127 | 128 | 129 | @mcp.resource("logs://groups/{log_group_name}/streams") 130 | def get_log_streams(log_group_name: str) -> str: 131 | """ 132 | Get a list of log streams for a specific log group 133 | 134 | Args: 135 | log_group_name: The name of the log group 136 | """ 137 | # Use default limit value 138 | limit = 20 139 | return cw_resource.get_log_streams(log_group_name, limit) 140 | 141 | 142 | @mcp.resource("logs://groups/{log_group_name}/streams/{log_stream_name}") 143 | def get_log_events(log_group_name: str, log_stream_name: str) -> str: 144 | """ 145 | Get log events from a specific log stream 146 | 147 | Args: 148 | log_group_name: The name of the log group 149 | log_stream_name: The name of the log stream 150 | """ 151 | # Use default limit value 152 | limit = 100 153 | return cw_resource.get_log_events(log_group_name, log_stream_name, limit) 154 | 155 | 156 | @mcp.resource("logs://groups/{log_group_name}/sample") 157 | def get_log_sample(log_group_name: str) -> str: 158 | """ 159 | Get a sample of recent logs from a log group 160 | 161 | Args: 162 | log_group_name: The name of the log group 163 | """ 164 | # Use default limit value 165 | limit = 10 166 | return cw_resource.get_log_sample(log_group_name, limit) 167 | 168 | 169 | @mcp.resource("logs://groups/{log_group_name}/recent-errors") 170 | def get_recent_errors(log_group_name: str) -> str: 171 | """ 172 | Get recent error logs from a log group 173 | 174 | Args: 175 | log_group_name: The name of the log group 176 | """ 177 | # Use default hours value 178 | hours = 24 179 | return cw_resource.get_recent_errors(log_group_name, hours) 180 | 181 | 182 | @mcp.resource("logs://groups/{log_group_name}/metrics") 183 | def get_log_metrics(log_group_name: str) -> str: 184 | """ 185 | Get log volume metrics for a log group 186 | 187 | Args: 188 | log_group_name: The name of the log group 189 | """ 190 | # Use default hours value 191 | hours = 24 192 | return cw_resource.get_log_metrics(log_group_name, hours) 193 | 194 | 195 | @mcp.resource("logs://groups/{log_group_name}/structure") 196 | def analyze_log_structure(log_group_name: str) -> str: 197 | """Analyze and provide information about the structure of logs""" 198 | return cw_resource.analyze_log_structure(log_group_name) 199 | 200 | 201 | # ============================== 202 | # Prompts 203 | # ============================== 204 | 205 | 206 | @mcp.prompt() 207 | def list_cloudwatch_log_groups( 208 | prefix: str = None, profile: str = None, region: str = None 209 | ) -> str: 210 | """ 211 | Prompt for listing and exploring CloudWatch log groups. 212 | 213 | Args: 214 | prefix: Optional prefix to filter log groups by name 215 | profile: Optional AWS profile name to use for credentials 216 | region: Optional AWS region name to use for API calls 217 | """ 218 | profile_text = f" using profile '{profile}'" if profile else "" 219 | region_text = f" in region '{region}'" if region else "" 220 | prefix_text = f" with prefix '{prefix}'" if prefix else "" 221 | 222 | return f"""I'll help you explore the CloudWatch log groups in your AWS environment{profile_text}{region_text}. 223 | 224 | First, I'll list the available log groups{prefix_text}. 225 | 226 | For each log group, I can help you: 227 | 1. Get detailed information about the group (retention, size, etc.) 228 | 2. Check for recent errors or patterns 229 | 3. View metrics like volume and activity 230 | 4. Sample recent logs to understand the content 231 | 5. Search for specific patterns or events 232 | 233 | Let me know which log group you'd like to explore further, or if you'd like to refine the search with a different prefix. 234 | """ 235 | 236 | 237 | @mcp.prompt() 238 | def analyze_cloudwatch_logs( 239 | log_group_name: str, profile: str = None, region: str = None 240 | ) -> str: 241 | """ 242 | Prompt for analyzing CloudWatch logs to help identify issues, patterns, and insights. 243 | 244 | Args: 245 | log_group_name: The name of the log group to analyze 246 | profile: Optional AWS profile name to use for credentials 247 | region: Optional AWS region name to use for API calls 248 | """ 249 | profile_text = f" using profile '{profile}'" if profile else "" 250 | region_text = f" in region '{region}'" if region else "" 251 | 252 | return f"""Please analyze the following CloudWatch logs from the {log_group_name} log group{profile_text}{region_text}. 253 | 254 | First, I'll get you some information about the log group: 255 | 1. Get the basic log group structure to understand the format of logs 256 | 2. Check for any recent errors 257 | 3. Examine the log volume metrics 258 | 4. Analyze a sample of recent logs 259 | 260 | Based on this information, please: 261 | - Identify any recurring errors or exceptions 262 | - Look for unusual patterns or anomalies 263 | - Suggest possible root causes for any issues found 264 | - Recommend actions to resolve or mitigate problems 265 | - Provide insights on performance or resource utilization 266 | 267 | Feel free to ask for additional context if needed, such as: 268 | - Correlation with logs from other services 269 | - More specific time ranges for analysis 270 | - Queries for specific error messages or events 271 | """ 272 | 273 | 274 | # ============================== 275 | # Tool Handlers 276 | # ============================== 277 | 278 | 279 | @mcp.tool() 280 | @with_aws_config(CloudWatchLogsResource, method_name="get_log_groups") 281 | async def list_log_groups( 282 | prefix: str = None, 283 | limit: int = 50, 284 | next_token: str = None, 285 | profile: str = None, 286 | region: str = None, 287 | ) -> str: 288 | """ 289 | List available CloudWatch log groups with optional filtering by prefix. 290 | 291 | Args: 292 | prefix: Optional prefix to filter log groups by name 293 | limit: Maximum number of log groups to return (default: 50) 294 | next_token: Token for pagination to get the next set of results 295 | profile: Optional AWS profile name to use for credentials 296 | region: Optional AWS region name to use for API calls 297 | 298 | Returns: 299 | JSON string with log groups information 300 | """ 301 | # Function body is handled by the decorator 302 | pass 303 | 304 | 305 | @mcp.tool() 306 | @with_aws_config(CloudWatchLogsSearchTools) 307 | async def search_logs( 308 | log_group_name: str, 309 | query: str, 310 | hours: int = 24, 311 | start_time: str = None, 312 | end_time: str = None, 313 | profile: str = None, 314 | region: str = None, 315 | ) -> str: 316 | """ 317 | Search logs using CloudWatch Logs Insights query. 318 | 319 | Args: 320 | log_group_name: The log group to search 321 | query: CloudWatch Logs Insights query syntax 322 | hours: Number of hours to look back 323 | start_time: Optional ISO8601 start time 324 | end_time: Optional ISO8601 end time 325 | profile: Optional AWS profile name to use for credentials 326 | region: Optional AWS region name to use for API calls 327 | 328 | Returns: 329 | JSON string with search results 330 | """ 331 | # Function body is handled by the decorator 332 | pass 333 | 334 | 335 | @mcp.tool() 336 | @with_aws_config(CloudWatchLogsSearchTools) 337 | async def search_logs_multi( 338 | log_group_names: List[str], 339 | query: str, 340 | hours: int = 24, 341 | start_time: str = None, 342 | end_time: str = None, 343 | profile: str = None, 344 | region: str = None, 345 | ) -> str: 346 | """ 347 | Search logs across multiple log groups using CloudWatch Logs Insights. 348 | 349 | Args: 350 | log_group_names: List of log groups to search 351 | query: CloudWatch Logs Insights query in Logs Insights syntax 352 | hours: Number of hours to look back (default: 24) 353 | start_time: Optional ISO8601 start time 354 | end_time: Optional ISO8601 end time 355 | profile: Optional AWS profile name to use for credentials 356 | region: Optional AWS region name to use for API calls 357 | 358 | Returns: 359 | JSON string with search results 360 | """ 361 | # Function body is handled by the decorator 362 | pass 363 | 364 | 365 | @mcp.tool() 366 | @with_aws_config(CloudWatchLogsSearchTools) 367 | async def filter_log_events( 368 | log_group_name: str, 369 | filter_pattern: str, 370 | hours: int = 24, 371 | start_time: str = None, 372 | end_time: str = None, 373 | profile: str = None, 374 | region: str = None, 375 | ) -> str: 376 | """ 377 | Filter log events by pattern across all streams in a log group. 378 | 379 | Args: 380 | log_group_name: The log group to filter 381 | filter_pattern: The pattern to search for (CloudWatch Logs filter syntax) 382 | hours: Number of hours to look back 383 | start_time: Optional ISO8601 start time 384 | end_time: Optional ISO8601 end time 385 | profile: Optional AWS profile name to use for credentials 386 | region: Optional AWS region name to use for API calls 387 | 388 | Returns: 389 | JSON string with filtered events 390 | """ 391 | # Function body is handled by the decorator 392 | pass 393 | 394 | 395 | @mcp.tool() 396 | @with_aws_config(CloudWatchLogsAnalysisTools) 397 | async def summarize_log_activity( 398 | log_group_name: str, 399 | hours: int = 24, 400 | start_time: str = None, 401 | end_time: str = None, 402 | profile: str = None, 403 | region: str = None, 404 | ) -> str: 405 | """ 406 | Generate a summary of log activity over a specified time period. 407 | 408 | Args: 409 | log_group_name: The log group to analyze 410 | hours: Number of hours to look back 411 | start_time: Optional ISO8601 start time 412 | end_time: Optional ISO8601 end time 413 | profile: Optional AWS profile name to use for credentials 414 | region: Optional AWS region name to use for API calls 415 | 416 | Returns: 417 | JSON string with activity summary 418 | """ 419 | # Function body is handled by the decorator 420 | pass 421 | 422 | 423 | @mcp.tool() 424 | @with_aws_config(CloudWatchLogsAnalysisTools) 425 | async def find_error_patterns( 426 | log_group_name: str, 427 | hours: int = 24, 428 | start_time: str = None, 429 | end_time: str = None, 430 | profile: str = None, 431 | region: str = None, 432 | ) -> str: 433 | """ 434 | Find common error patterns in logs. 435 | 436 | Args: 437 | log_group_name: The log group to analyze 438 | hours: Number of hours to look back 439 | start_time: Optional ISO8601 start time 440 | end_time: Optional ISO8601 end time 441 | profile: Optional AWS profile name to use for credentials 442 | region: Optional AWS region name to use for API calls 443 | 444 | Returns: 445 | JSON string with error patterns 446 | """ 447 | # Function body is handled by the decorator 448 | pass 449 | 450 | 451 | @mcp.tool() 452 | @with_aws_config(CloudWatchLogsCorrelationTools) 453 | async def correlate_logs( 454 | log_group_names: List[str], 455 | search_term: str, 456 | hours: int = 24, 457 | start_time: str = None, 458 | end_time: str = None, 459 | profile: str = None, 460 | region: str = None, 461 | ) -> str: 462 | """ 463 | Correlate logs across multiple AWS services using a common search term. 464 | 465 | Args: 466 | log_group_names: List of log group names to search 467 | search_term: Term to search for in logs (request ID, transaction ID, etc.) 468 | hours: Number of hours to look back 469 | start_time: Optional ISO8601 start time 470 | end_time: Optional ISO8601 end time 471 | profile: Optional AWS profile name to use for credentials 472 | region: Optional AWS region name to use for API calls 473 | 474 | Returns: 475 | JSON string with correlated events 476 | """ 477 | # Function body is handled by the decorator 478 | pass 479 | 480 | 481 | if __name__ == "__main__": 482 | # Run the MCP server 483 | mcp.run() 484 | ``` -------------------------------------------------------------------------------- /src/cw-mcp-server/resources/cloudwatch_logs_resource.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | import boto3 7 | import json 8 | from datetime import datetime, timedelta 9 | from typing import Dict, List 10 | import re 11 | from collections import Counter 12 | 13 | 14 | class CloudWatchLogsResource: 15 | """Resource class for handling CloudWatch Logs resources.""" 16 | 17 | def __init__(self, profile_name=None, region_name=None): 18 | """Initialize the CloudWatch Logs resource client. 19 | 20 | Args: 21 | profile_name: Optional AWS profile name to use for credentials 22 | region_name: Optional AWS region name to use for API calls 23 | """ 24 | # Store the profile name and region for later use 25 | self.profile_name = profile_name 26 | self.region_name = region_name 27 | 28 | # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain 29 | session = boto3.Session(profile_name=profile_name, region_name=region_name) 30 | self.logs_client = session.client("logs") 31 | 32 | def get_log_groups( 33 | self, prefix: str = None, limit: int = 50, next_token: str = None 34 | ) -> str: 35 | """ 36 | Get a list of CloudWatch Log Groups with optional filtering and pagination. 37 | 38 | Args: 39 | prefix: Optional prefix to filter log groups by name 40 | limit: Maximum number of log groups to return (default: 50) 41 | next_token: Token for pagination to get the next set of results 42 | 43 | Returns: 44 | JSON string with log groups information 45 | """ 46 | kwargs = {"limit": limit} 47 | if prefix: 48 | kwargs["logGroupNamePrefix"] = prefix 49 | if next_token: 50 | kwargs["nextToken"] = next_token 51 | 52 | response = self.logs_client.describe_log_groups(**kwargs) 53 | log_groups = response.get("logGroups", []) 54 | 55 | # Format the log groups information 56 | formatted_groups = [] 57 | for group in log_groups: 58 | formatted_groups.append( 59 | { 60 | "name": group.get("logGroupName"), 61 | "arn": group.get("arn"), 62 | "storedBytes": group.get("storedBytes"), 63 | "creationTime": datetime.fromtimestamp( 64 | group.get("creationTime", 0) / 1000 65 | ).isoformat(), 66 | } 67 | ) 68 | 69 | # Include the nextToken if available 70 | result = {"logGroups": formatted_groups} 71 | 72 | if "nextToken" in response: 73 | result["nextToken"] = response["nextToken"] 74 | 75 | return json.dumps(result, indent=2) 76 | 77 | def get_log_group_details(self, log_group_name: str) -> str: 78 | """Get detailed information about a specific log group.""" 79 | try: 80 | response = self.logs_client.describe_log_groups( 81 | logGroupNamePrefix=log_group_name, limit=1 82 | ) 83 | log_groups = response.get("logGroups", []) 84 | 85 | if not log_groups: 86 | return json.dumps( 87 | {"error": f"Log group '{log_group_name}' not found"}, indent=2 88 | ) 89 | 90 | log_group = log_groups[0] 91 | 92 | # Get retention policy 93 | retention = "Never Expire" 94 | if "retentionInDays" in log_group: 95 | retention = f"{log_group['retentionInDays']} days" 96 | 97 | # Get metrics for the log group 98 | session = boto3.Session( 99 | profile_name=self.profile_name, region_name=self.region_name 100 | ) 101 | cloudwatch = session.client("cloudwatch") 102 | end_time = datetime.utcnow() 103 | start_time = end_time - timedelta(days=1) 104 | 105 | metrics_response = cloudwatch.get_metric_statistics( 106 | Namespace="AWS/Logs", 107 | MetricName="IncomingBytes", 108 | Dimensions=[ 109 | {"Name": "LogGroupName", "Value": log_group_name}, 110 | ], 111 | StartTime=start_time, 112 | EndTime=end_time, 113 | Period=3600, 114 | Statistics=["Sum"], 115 | ) 116 | 117 | # Format the detailed information 118 | details = { 119 | "name": log_group.get("logGroupName"), 120 | "arn": log_group.get("arn"), 121 | "storedBytes": log_group.get("storedBytes"), 122 | "creationTime": datetime.fromtimestamp( 123 | log_group.get("creationTime", 0) / 1000 124 | ).isoformat(), 125 | "retentionPolicy": retention, 126 | "metricFilterCount": log_group.get("metricFilterCount", 0), 127 | "kmsKeyId": log_group.get("kmsKeyId", "Not encrypted with KMS"), 128 | "dailyIncomingBytes": [ 129 | {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]} 130 | for point in metrics_response.get("Datapoints", []) 131 | ], 132 | } 133 | 134 | return json.dumps(details, indent=2) 135 | except Exception as e: 136 | return json.dumps({"error": str(e)}, indent=2) 137 | 138 | def get_log_streams(self, log_group_name: str, limit: int = 20) -> str: 139 | """ 140 | Get a list of log streams for a specific log group. 141 | 142 | Args: 143 | log_group_name: The name of the log group 144 | limit: Maximum number of streams to return (default: 20) 145 | """ 146 | try: 147 | response = self.logs_client.describe_log_streams( 148 | logGroupName=log_group_name, 149 | orderBy="LastEventTime", 150 | descending=True, 151 | limit=limit, 152 | ) 153 | 154 | log_streams = response.get("logStreams", []) 155 | formatted_streams = [] 156 | 157 | for stream in log_streams: 158 | last_event_time = stream.get("lastEventTimestamp", 0) 159 | first_event_time = stream.get("firstEventTimestamp", 0) 160 | 161 | formatted_streams.append( 162 | { 163 | "name": stream.get("logStreamName"), 164 | "firstEventTime": datetime.fromtimestamp( 165 | first_event_time / 1000 166 | ).isoformat() 167 | if first_event_time 168 | else None, 169 | "lastEventTime": datetime.fromtimestamp( 170 | last_event_time / 1000 171 | ).isoformat() 172 | if last_event_time 173 | else None, 174 | "storedBytes": stream.get("storedBytes"), 175 | } 176 | ) 177 | 178 | return json.dumps(formatted_streams, indent=2) 179 | except Exception as e: 180 | return json.dumps({"error": str(e)}, indent=2) 181 | 182 | def get_log_events( 183 | self, log_group_name: str, log_stream_name: str, limit: int = 100 184 | ) -> str: 185 | """ 186 | Get log events from a specific log stream. 187 | 188 | Args: 189 | log_group_name: The name of the log group 190 | log_stream_name: The name of the log stream 191 | limit: Maximum number of events to return (default: 100) 192 | """ 193 | try: 194 | response = self.logs_client.get_log_events( 195 | logGroupName=log_group_name, 196 | logStreamName=log_stream_name, 197 | limit=limit, 198 | startFromHead=False, 199 | ) 200 | 201 | events = response.get("events", []) 202 | formatted_events = [] 203 | 204 | for event in events: 205 | formatted_events.append( 206 | { 207 | "timestamp": datetime.fromtimestamp( 208 | event.get("timestamp", 0) / 1000 209 | ).isoformat(), 210 | "message": event.get("message"), 211 | "ingestionTime": datetime.fromtimestamp( 212 | event.get("ingestionTime", 0) / 1000 213 | ).isoformat(), 214 | } 215 | ) 216 | 217 | return json.dumps(formatted_events, indent=2) 218 | except Exception as e: 219 | return json.dumps({"error": str(e)}, indent=2) 220 | 221 | def get_log_sample(self, log_group_name: str, limit: int = 10) -> str: 222 | """Get a sample of recent logs from a log group.""" 223 | try: 224 | # First get the most recent stream 225 | stream_response = self.logs_client.describe_log_streams( 226 | logGroupName=log_group_name, 227 | orderBy="LastEventTime", 228 | descending=True, 229 | limit=1, 230 | ) 231 | 232 | log_streams = stream_response.get("logStreams", []) 233 | if not log_streams: 234 | return json.dumps( 235 | {"error": f"No streams found in log group '{log_group_name}'"}, 236 | indent=2, 237 | ) 238 | 239 | # Get events from the most recent stream 240 | log_stream_name = log_streams[0].get("logStreamName") 241 | response = self.logs_client.get_log_events( 242 | logGroupName=log_group_name, 243 | logStreamName=log_stream_name, 244 | limit=limit, 245 | startFromHead=False, 246 | ) 247 | 248 | events = response.get("events", []) 249 | formatted_events = [] 250 | 251 | for event in events: 252 | formatted_events.append( 253 | { 254 | "timestamp": datetime.fromtimestamp( 255 | event.get("timestamp", 0) / 1000 256 | ).isoformat(), 257 | "message": event.get("message"), 258 | "streamName": log_stream_name, 259 | } 260 | ) 261 | 262 | return json.dumps( 263 | { 264 | "description": f"Sample of {len(formatted_events)} recent logs from '{log_group_name}'", 265 | "logStream": log_stream_name, 266 | "events": formatted_events, 267 | }, 268 | indent=2, 269 | ) 270 | except Exception as e: 271 | return json.dumps({"error": str(e)}, indent=2) 272 | 273 | def get_recent_errors(self, log_group_name: str, hours: int = 24) -> str: 274 | """Get recent error logs from a log group.""" 275 | try: 276 | # Calculate start time 277 | end_time = int(datetime.now().timestamp() * 1000) 278 | start_time = int( 279 | (datetime.now() - timedelta(hours=hours)).timestamp() * 1000 280 | ) 281 | 282 | # Use filter_log_events to search for errors across all streams 283 | # Common error patterns to search for 284 | error_patterns = [ 285 | "ERROR", 286 | "Error", 287 | "error", 288 | "exception", 289 | "Exception", 290 | "EXCEPTION", 291 | "fail", 292 | "Fail", 293 | "FAIL", 294 | ] 295 | 296 | filter_pattern = " ".join([f'"{pattern}"' for pattern in error_patterns]) 297 | response = self.logs_client.filter_log_events( 298 | logGroupName=log_group_name, 299 | filterPattern=f"{filter_pattern}", 300 | startTime=start_time, 301 | endTime=end_time, 302 | limit=100, 303 | ) 304 | 305 | events = response.get("events", []) 306 | formatted_events = [] 307 | 308 | for event in events: 309 | formatted_events.append( 310 | { 311 | "timestamp": datetime.fromtimestamp( 312 | event.get("timestamp", 0) / 1000 313 | ).isoformat(), 314 | "message": event.get("message"), 315 | "logStreamName": event.get("logStreamName"), 316 | } 317 | ) 318 | 319 | return json.dumps( 320 | { 321 | "description": f"Recent errors from '{log_group_name}' in the last {hours} hours", 322 | "totalErrors": len(formatted_events), 323 | "events": formatted_events, 324 | }, 325 | indent=2, 326 | ) 327 | except Exception as e: 328 | return json.dumps({"error": str(e)}, indent=2) 329 | 330 | def get_log_metrics(self, log_group_name: str, hours: int = 24) -> str: 331 | """Get log volume metrics for a log group.""" 332 | try: 333 | # Create CloudWatch client 334 | session = boto3.Session( 335 | profile_name=self.profile_name, region_name=self.region_name 336 | ) 337 | cloudwatch = session.client("cloudwatch") 338 | 339 | # Calculate start and end times 340 | end_time = datetime.utcnow() 341 | start_time = end_time - timedelta(hours=hours) 342 | 343 | # Get incoming bytes 344 | incoming_bytes = cloudwatch.get_metric_statistics( 345 | Namespace="AWS/Logs", 346 | MetricName="IncomingBytes", 347 | Dimensions=[ 348 | {"Name": "LogGroupName", "Value": log_group_name}, 349 | ], 350 | StartTime=start_time, 351 | EndTime=end_time, 352 | Period=3600, # 1 hour periods 353 | Statistics=["Sum"], 354 | ) 355 | 356 | # Get incoming log events 357 | incoming_events = cloudwatch.get_metric_statistics( 358 | Namespace="AWS/Logs", 359 | MetricName="IncomingLogEvents", 360 | Dimensions=[ 361 | {"Name": "LogGroupName", "Value": log_group_name}, 362 | ], 363 | StartTime=start_time, 364 | EndTime=end_time, 365 | Period=3600, # 1 hour periods 366 | Statistics=["Sum"], 367 | ) 368 | 369 | # Format metrics data 370 | bytes_datapoints = incoming_bytes.get("Datapoints", []) 371 | events_datapoints = incoming_events.get("Datapoints", []) 372 | 373 | bytes_datapoints.sort(key=lambda x: x["Timestamp"]) 374 | events_datapoints.sort(key=lambda x: x["Timestamp"]) 375 | 376 | bytes_data = [ 377 | {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]} 378 | for point in bytes_datapoints 379 | ] 380 | 381 | events_data = [ 382 | {"timestamp": point["Timestamp"].isoformat(), "events": point["Sum"]} 383 | for point in events_datapoints 384 | ] 385 | 386 | # Calculate totals 387 | total_bytes = sum(point["Sum"] for point in bytes_datapoints) 388 | total_events = sum(point["Sum"] for point in events_datapoints) 389 | 390 | return json.dumps( 391 | { 392 | "description": f"Log metrics for '{log_group_name}' over the last {hours} hours", 393 | "totalBytes": total_bytes, 394 | "totalEvents": total_events, 395 | "bytesByHour": bytes_data, 396 | "eventsByHour": events_data, 397 | }, 398 | indent=2, 399 | ) 400 | except Exception as e: 401 | return json.dumps({"error": str(e)}, indent=2) 402 | 403 | def analyze_log_structure(self, log_group_name: str) -> str: 404 | """Analyze and provide information about the structure of logs.""" 405 | try: 406 | # Get a sample of logs to analyze 407 | sample_data = json.loads(self.get_log_sample(log_group_name, 50)) 408 | 409 | if "error" in sample_data: 410 | return json.dumps(sample_data, indent=2) 411 | 412 | events = sample_data.get("events", []) 413 | 414 | if not events: 415 | return json.dumps( 416 | {"error": "No log events found for analysis"}, indent=2 417 | ) 418 | 419 | # Analyze the structure 420 | structure_info = { 421 | "description": f"Log structure analysis for '{log_group_name}'", 422 | "sampleSize": len(events), 423 | "format": self._detect_log_format(events), 424 | "commonPatterns": self._extract_common_patterns(events), 425 | "fieldAnalysis": self._analyze_fields(events), 426 | } 427 | 428 | return json.dumps(structure_info, indent=2) 429 | except Exception as e: 430 | return json.dumps({"error": str(e)}, indent=2) 431 | 432 | def _detect_log_format(self, events: List[Dict]) -> str: 433 | """Detect the format of logs (JSON, plaintext, etc.).""" 434 | json_count = 0 435 | key_value_count = 0 436 | xml_count = 0 437 | 438 | for event in events: 439 | message = event.get("message", "") 440 | 441 | # Check for JSON format 442 | if message.strip().startswith("{") and message.strip().endswith("}"): 443 | try: 444 | json.loads(message) 445 | json_count += 1 446 | continue 447 | except json.JSONDecodeError: 448 | pass 449 | 450 | # Check for XML format 451 | if message.strip().startswith("<") and message.strip().endswith(">"): 452 | xml_count += 1 453 | continue 454 | 455 | # Check for key-value pairs 456 | if re.search(r"\w+=[\'\"][^\'\"]*[\'\"]|\w+=\S+", message): 457 | key_value_count += 1 458 | 459 | total = len(events) 460 | 461 | if json_count > total * 0.7: 462 | return "JSON" 463 | elif xml_count > total * 0.7: 464 | return "XML" 465 | elif key_value_count > total * 0.7: 466 | return "Key-Value Pairs" 467 | else: 468 | return "Plaintext/Unstructured" 469 | 470 | def _extract_common_patterns(self, events: List[Dict]) -> Dict: 471 | """Extract common patterns from log messages.""" 472 | # Look for common log patterns 473 | level_pattern = re.compile( 474 | r"\b(DEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)\b" 475 | ) 476 | timestamp_patterns = [ 477 | re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"), # ISO format 478 | re.compile( 479 | r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" 480 | ), # Common datetime format 481 | re.compile(r"\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}"), # MM/DD/YYYY format 482 | ] 483 | 484 | # Count occurrences 485 | levels = Counter() 486 | has_timestamp = 0 487 | 488 | for event in events: 489 | message = event.get("message", "") 490 | 491 | # Check log levels 492 | level_match = level_pattern.search(message) 493 | if level_match: 494 | levels[level_match.group(0)] += 1 495 | 496 | # Check timestamps in message content (not event timestamp) 497 | for pattern in timestamp_patterns: 498 | if pattern.search(message): 499 | has_timestamp += 1 500 | break 501 | 502 | return { 503 | "logLevels": dict(levels), 504 | "containsTimestamp": has_timestamp, 505 | "timestampPercentage": round((has_timestamp / len(events)) * 100, 2) 506 | if events 507 | else 0, 508 | } 509 | 510 | def _analyze_fields(self, events: List[Dict]) -> Dict: 511 | """Analyze fields in structured log messages.""" 512 | format_type = self._detect_log_format(events) 513 | 514 | if format_type == "JSON": 515 | # Try to extract fields from JSON logs 516 | fields_count = Counter() 517 | 518 | for event in events: 519 | message = event.get("message", "") 520 | try: 521 | json_data = json.loads(message) 522 | for key in json_data.keys(): 523 | fields_count[key] += 1 524 | except json.JSONDecodeError: 525 | continue 526 | 527 | # Get the most common fields 528 | common_fields = [ 529 | { 530 | "field": field, 531 | "occurrences": count, 532 | "percentage": round((count / len(events)) * 100, 2), 533 | } 534 | for field, count in fields_count.most_common(10) 535 | ] 536 | 537 | return {"commonFields": common_fields, "uniqueFields": len(fields_count)} 538 | 539 | elif format_type == "Key-Value Pairs": 540 | # Try to extract key-value pairs 541 | key_pattern = re.compile(r"(\w+)=[\'\"]?([^\'\"\s]*)[\'\"]?") 542 | fields_count = Counter() 543 | 544 | for event in events: 545 | message = event.get("message", "") 546 | matches = key_pattern.findall(message) 547 | for key, _ in matches: 548 | fields_count[key] += 1 549 | 550 | # Get the most common fields 551 | common_fields = [ 552 | { 553 | "field": field, 554 | "occurrences": count, 555 | "percentage": round((count / len(events)) * 100, 2), 556 | } 557 | for field, count in fields_count.most_common(10) 558 | ] 559 | 560 | return {"commonFields": common_fields, "uniqueFields": len(fields_count)} 561 | 562 | else: 563 | return { 564 | "analysis": f"Field analysis not applicable for {format_type} format" 565 | } 566 | ``` -------------------------------------------------------------------------------- /src/client.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | 3 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 | # SPDX-License-Identifier: Apache-2.0 5 | 6 | import asyncio 7 | import argparse 8 | import json 9 | import sys 10 | import os 11 | 12 | from mcp import ClientSession, StdioServerParameters 13 | from mcp.client.stdio import stdio_client 14 | 15 | # Set up argument parser for the CLI 16 | parser = argparse.ArgumentParser(description="CloudWatch Logs MCP Client") 17 | parser.add_argument( 18 | "--profile", type=str, help="AWS profile name to use for credentials" 19 | ) 20 | parser.add_argument("--region", type=str, help="AWS region name to use for API calls") 21 | subparsers = parser.add_subparsers(dest="command", help="Command to execute") 22 | 23 | # List log groups command 24 | list_groups_parser = subparsers.add_parser( 25 | "list-groups", help="List CloudWatch log groups" 26 | ) 27 | list_groups_parser.add_argument("--prefix", help="Filter log groups by name prefix") 28 | list_groups_parser.add_argument( 29 | "--limit", 30 | type=int, 31 | default=50, 32 | help="Maximum number of log groups to return (default: 50)", 33 | ) 34 | list_groups_parser.add_argument( 35 | "--next-token", help="Token for pagination to get the next set of results" 36 | ) 37 | list_groups_parser.add_argument( 38 | "--use-tool", action="store_true", help="Use the tool interface instead of resource" 39 | ) 40 | list_groups_parser.add_argument( 41 | "--profile", help="AWS profile name to use for credentials" 42 | ) 43 | list_groups_parser.add_argument("--region", help="AWS region name to use for API calls") 44 | 45 | # Get log group details command 46 | group_details_parser = subparsers.add_parser( 47 | "group-details", help="Get detailed information about a log group" 48 | ) 49 | group_details_parser.add_argument("log_group_name", help="The name of the log group") 50 | group_details_parser.add_argument( 51 | "--profile", help="AWS profile name to use for credentials" 52 | ) 53 | group_details_parser.add_argument( 54 | "--region", help="AWS region name to use for API calls" 55 | ) 56 | 57 | # List log streams command 58 | list_streams_parser = subparsers.add_parser( 59 | "list-streams", help="List log streams for a specific log group" 60 | ) 61 | list_streams_parser.add_argument("log_group_name", help="The name of the log group") 62 | list_streams_parser.add_argument( 63 | "--profile", help="AWS profile name to use for credentials" 64 | ) 65 | list_streams_parser.add_argument( 66 | "--region", help="AWS region name to use for API calls" 67 | ) 68 | 69 | # Get log events command 70 | get_events_parser = subparsers.add_parser( 71 | "get-events", help="Get log events from a specific log stream" 72 | ) 73 | get_events_parser.add_argument("log_group_name", help="The name of the log group") 74 | get_events_parser.add_argument("log_stream_name", help="The name of the log stream") 75 | get_events_parser.add_argument( 76 | "--profile", help="AWS profile name to use for credentials" 77 | ) 78 | get_events_parser.add_argument("--region", help="AWS region name to use for API calls") 79 | 80 | # Get log sample command 81 | sample_parser = subparsers.add_parser( 82 | "sample", help="Get a sample of recent logs from a log group" 83 | ) 84 | sample_parser.add_argument("log_group_name", help="The name of the log group") 85 | sample_parser.add_argument( 86 | "--limit", type=int, default=10, help="Number of logs to sample (default: 10)" 87 | ) 88 | sample_parser.add_argument("--profile", help="AWS profile name to use for credentials") 89 | sample_parser.add_argument("--region", help="AWS region name to use for API calls") 90 | 91 | # Get recent errors command 92 | errors_parser = subparsers.add_parser( 93 | "recent-errors", help="Get recent error logs from a log group" 94 | ) 95 | errors_parser.add_argument( 96 | "log_group_name", help="The name of the log group to analyze" 97 | ) 98 | errors_parser.add_argument( 99 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 100 | ) 101 | errors_parser.add_argument("--profile", help="AWS profile name to use for credentials") 102 | errors_parser.add_argument("--region", help="AWS region name to use for API calls") 103 | 104 | # Get log metrics command 105 | metrics_parser = subparsers.add_parser( 106 | "metrics", help="Get log volume metrics for a log group" 107 | ) 108 | metrics_parser.add_argument( 109 | "log_group_name", help="The name of the log group to analyze" 110 | ) 111 | metrics_parser.add_argument( 112 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 113 | ) 114 | metrics_parser.add_argument("--profile", help="AWS profile name to use for credentials") 115 | metrics_parser.add_argument("--region", help="AWS region name to use for API calls") 116 | 117 | # Analyze log structure command 118 | structure_parser = subparsers.add_parser( 119 | "structure", help="Analyze the structure of logs in a log group" 120 | ) 121 | structure_parser.add_argument( 122 | "log_group_name", help="The name of the log group to analyze" 123 | ) 124 | structure_parser.add_argument( 125 | "--profile", help="AWS profile name to use for credentials" 126 | ) 127 | structure_parser.add_argument("--region", help="AWS region name to use for API calls") 128 | 129 | # Get analyze logs prompt command 130 | prompt_parser = subparsers.add_parser( 131 | "get-prompt", help="Get a prompt for analyzing CloudWatch logs" 132 | ) 133 | prompt_parser.add_argument( 134 | "log_group_name", help="The name of the log group to analyze" 135 | ) 136 | prompt_parser.add_argument("--profile", help="AWS profile name to use for credentials") 137 | prompt_parser.add_argument("--region", help="AWS region name to use for API calls") 138 | 139 | # Get list groups prompt command 140 | list_prompt_parser = subparsers.add_parser( 141 | "list-prompt", help="Get a prompt for listing CloudWatch log groups" 142 | ) 143 | list_prompt_parser.add_argument( 144 | "--prefix", help="Optional prefix to filter log groups by name" 145 | ) 146 | list_prompt_parser.add_argument( 147 | "--profile", help="AWS profile name to use for credentials" 148 | ) 149 | list_prompt_parser.add_argument("--region", help="AWS region name to use for API calls") 150 | 151 | # Search logs command 152 | search_parser = subparsers.add_parser( 153 | "search", help="Search for patterns in CloudWatch logs" 154 | ) 155 | search_parser.add_argument("log_group_name", help="The name of the log group to search") 156 | search_parser.add_argument( 157 | "query", help="The search query (CloudWatch Logs Insights syntax)" 158 | ) 159 | search_parser.add_argument( 160 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 161 | ) 162 | search_parser.add_argument( 163 | "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" 164 | ) 165 | search_parser.add_argument( 166 | "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" 167 | ) 168 | search_parser.add_argument("--profile", help="AWS profile name to use for credentials") 169 | search_parser.add_argument("--region", help="AWS region name to use for API calls") 170 | 171 | # Search multiple log groups command 172 | search_multi_parser = subparsers.add_parser( 173 | "search-multi", help="Search for patterns across multiple CloudWatch log groups" 174 | ) 175 | search_multi_parser.add_argument( 176 | "log_group_names", nargs="+", help="List of log group names to search" 177 | ) 178 | search_multi_parser.add_argument( 179 | "query", help="The search query (CloudWatch Logs Insights syntax)" 180 | ) 181 | search_multi_parser.add_argument( 182 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 183 | ) 184 | search_multi_parser.add_argument( 185 | "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" 186 | ) 187 | search_multi_parser.add_argument( 188 | "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" 189 | ) 190 | search_multi_parser.add_argument( 191 | "--profile", help="AWS profile name to use for credentials" 192 | ) 193 | search_multi_parser.add_argument( 194 | "--region", help="AWS region name to use for API calls" 195 | ) 196 | 197 | # Summarize log activity command 198 | summarize_parser = subparsers.add_parser( 199 | "summarize", help="Generate a summary of log activity" 200 | ) 201 | summarize_parser.add_argument( 202 | "log_group_name", help="The name of the log group to analyze" 203 | ) 204 | summarize_parser.add_argument( 205 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 206 | ) 207 | summarize_parser.add_argument( 208 | "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" 209 | ) 210 | summarize_parser.add_argument( 211 | "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" 212 | ) 213 | summarize_parser.add_argument( 214 | "--profile", help="AWS profile name to use for credentials" 215 | ) 216 | summarize_parser.add_argument("--region", help="AWS region name to use for API calls") 217 | 218 | # Find error patterns command 219 | errors_parser = subparsers.add_parser( 220 | "find-errors", help="Find common error patterns in logs" 221 | ) 222 | errors_parser.add_argument( 223 | "log_group_name", help="The name of the log group to analyze" 224 | ) 225 | errors_parser.add_argument( 226 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 227 | ) 228 | errors_parser.add_argument( 229 | "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" 230 | ) 231 | errors_parser.add_argument( 232 | "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" 233 | ) 234 | errors_parser.add_argument("--profile", help="AWS profile name to use for credentials") 235 | errors_parser.add_argument("--region", help="AWS region name to use for API calls") 236 | 237 | # Correlate logs command 238 | correlate_parser = subparsers.add_parser( 239 | "correlate", help="Correlate logs across multiple AWS services" 240 | ) 241 | correlate_parser.add_argument( 242 | "log_group_names", nargs="+", help="List of log group names to search" 243 | ) 244 | correlate_parser.add_argument("search_term", help="Term to search for in logs") 245 | correlate_parser.add_argument( 246 | "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" 247 | ) 248 | correlate_parser.add_argument( 249 | "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" 250 | ) 251 | correlate_parser.add_argument( 252 | "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" 253 | ) 254 | correlate_parser.add_argument( 255 | "--profile", help="AWS profile name to use for credentials" 256 | ) 257 | correlate_parser.add_argument("--region", help="AWS region name to use for API calls") 258 | 259 | 260 | def add_aws_config_args(tool_args, args): 261 | """Add profile and region arguments to tool calls if specified.""" 262 | if args.profile: 263 | tool_args["profile"] = args.profile 264 | if args.region: 265 | tool_args["region"] = args.region 266 | return tool_args 267 | 268 | 269 | async def main(): 270 | """Main function to run the CloudWatch Logs MCP client.""" 271 | args = parser.parse_args() 272 | 273 | # Determine the server path (relative or absolute) 274 | script_dir = os.path.dirname(os.path.abspath(__file__)) 275 | server_path = os.path.join(script_dir, "cw-mcp-server", "server.py") 276 | 277 | # Prepare server arguments 278 | server_args = [server_path] 279 | if args.profile: 280 | server_args.extend(["--profile", args.profile]) 281 | if args.region: 282 | server_args.extend(["--region", args.region]) 283 | 284 | # Create server parameters 285 | server_params = StdioServerParameters(command="python3", args=server_args, env=None) 286 | 287 | # Connect to the server 288 | async with stdio_client(server_params) as (read, write): 289 | async with ClientSession(read, write) as session: 290 | # Initialize the client session 291 | await session.initialize() 292 | 293 | # Check if a command was specified 294 | if args.command is None: 295 | parser.print_help() 296 | return 297 | 298 | try: 299 | # Execute the requested command 300 | if args.command == "list-groups": 301 | if args.use_tool: 302 | # Use the tool interface 303 | tool_args = {} 304 | if args.prefix: 305 | tool_args["prefix"] = args.prefix 306 | if args.limit: 307 | tool_args["limit"] = args.limit 308 | if args.next_token: 309 | tool_args["next_token"] = args.next_token 310 | tool_args = add_aws_config_args(tool_args, args) 311 | 312 | result = await session.call_tool( 313 | "list_log_groups", arguments=tool_args 314 | ) 315 | print_json_response(result) 316 | else: 317 | # Use the resource interface 318 | # Build query string for parameters if provided 319 | if args.prefix: 320 | # If prefix is provided, use the filtered endpoint 321 | resource_uri = f"logs://groups/filter/{args.prefix}" 322 | else: 323 | resource_uri = "logs://groups" 324 | 325 | content, _ = await session.read_resource(resource_uri) 326 | print_json_response(content) 327 | 328 | elif args.command == "group-details": 329 | resource_uri = f"logs://groups/{args.log_group_name}" 330 | content, _ = await session.read_resource(resource_uri) 331 | print_json_response(content) 332 | 333 | elif args.command == "list-streams": 334 | resource_uri = f"logs://groups/{args.log_group_name}/streams" 335 | content, _ = await session.read_resource(resource_uri) 336 | print_json_response(content) 337 | 338 | elif args.command == "get-events": 339 | resource_uri = f"logs://groups/{args.log_group_name}/streams/{args.log_stream_name}" 340 | content, _ = await session.read_resource(resource_uri) 341 | print_json_response(content) 342 | 343 | elif args.command == "sample": 344 | resource_uri = ( 345 | f"logs://groups/{args.log_group_name}/sample?limit={args.limit}" 346 | ) 347 | content, _ = await session.read_resource(resource_uri) 348 | print_json_response(content) 349 | 350 | elif args.command == "recent-errors": 351 | resource_uri = f"logs://groups/{args.log_group_name}/recent-errors?hours={args.hours}" 352 | content, _ = await session.read_resource(resource_uri) 353 | print_json_response(content) 354 | 355 | elif args.command == "metrics": 356 | resource_uri = f"logs://groups/{args.log_group_name}/metrics?hours={args.hours}" 357 | content, _ = await session.read_resource(resource_uri) 358 | print_json_response(content) 359 | 360 | elif args.command == "structure": 361 | resource_uri = f"logs://groups/{args.log_group_name}/structure" 362 | content, _ = await session.read_resource(resource_uri) 363 | print_json_response(content) 364 | 365 | elif args.command == "get-prompt": 366 | # Get the analyze logs prompt from the server 367 | arguments = {"log_group_name": args.log_group_name} 368 | arguments = add_aws_config_args(arguments, args) 369 | result = await session.get_prompt( 370 | "analyze_cloudwatch_logs", 371 | arguments=arguments, 372 | ) 373 | 374 | # Extract and print the prompt text 375 | prompt_messages = result.messages 376 | if prompt_messages and len(prompt_messages) > 0: 377 | message = prompt_messages[0] 378 | if hasattr(message, "content") and hasattr( 379 | message.content, "text" 380 | ): 381 | print(message.content.text) 382 | else: 383 | print( 384 | json.dumps( 385 | message, default=lambda x: x.__dict__, indent=2 386 | ) 387 | ) 388 | else: 389 | print("No prompt received.") 390 | 391 | elif args.command == "list-prompt": 392 | # Get arguments for the prompt 393 | arguments = {} 394 | if args.prefix: 395 | arguments["prefix"] = args.prefix 396 | arguments = add_aws_config_args(arguments, args) 397 | 398 | # Get the list logs prompt from the server 399 | result = await session.get_prompt( 400 | "list_cloudwatch_log_groups", arguments=arguments 401 | ) 402 | 403 | # Extract and print the prompt text 404 | prompt_messages = result.messages 405 | if prompt_messages and len(prompt_messages) > 0: 406 | message = prompt_messages[0] 407 | if hasattr(message, "content") and hasattr( 408 | message.content, "text" 409 | ): 410 | print(message.content.text) 411 | else: 412 | print( 413 | json.dumps( 414 | message, default=lambda x: x.__dict__, indent=2 415 | ) 416 | ) 417 | else: 418 | print("No prompt received.") 419 | 420 | elif args.command == "search": 421 | tool_args = { 422 | "log_group_name": args.log_group_name, 423 | "query": args.query, 424 | } 425 | if args.start_time: 426 | tool_args["start_time"] = args.start_time 427 | if args.end_time: 428 | tool_args["end_time"] = args.end_time 429 | if not (args.start_time or args.end_time): 430 | tool_args["hours"] = args.hours 431 | tool_args = add_aws_config_args(tool_args, args) 432 | result = await session.call_tool( 433 | "search_logs", 434 | arguments=tool_args, 435 | ) 436 | print_json_response(result) 437 | 438 | elif args.command == "search-multi": 439 | tool_args = { 440 | "log_group_names": args.log_group_names, 441 | "query": args.query, 442 | } 443 | if args.start_time: 444 | tool_args["start_time"] = args.start_time 445 | if args.end_time: 446 | tool_args["end_time"] = args.end_time 447 | if not (args.start_time or args.end_time): 448 | tool_args["hours"] = args.hours 449 | tool_args = add_aws_config_args(tool_args, args) 450 | result = await session.call_tool( 451 | "search_logs_multi", 452 | arguments=tool_args, 453 | ) 454 | print_json_response(result) 455 | 456 | elif args.command == "summarize": 457 | tool_args = { 458 | "log_group_name": args.log_group_name, 459 | } 460 | if args.start_time: 461 | tool_args["start_time"] = args.start_time 462 | if args.end_time: 463 | tool_args["end_time"] = args.end_time 464 | if not (args.start_time or args.end_time): 465 | tool_args["hours"] = args.hours 466 | tool_args = add_aws_config_args(tool_args, args) 467 | result = await session.call_tool( 468 | "summarize_log_activity", 469 | arguments=tool_args, 470 | ) 471 | print_json_response(result) 472 | 473 | elif args.command == "find-errors": 474 | tool_args = { 475 | "log_group_name": args.log_group_name, 476 | } 477 | if args.start_time: 478 | tool_args["start_time"] = args.start_time 479 | if args.end_time: 480 | tool_args["end_time"] = args.end_time 481 | if not (args.start_time or args.end_time): 482 | tool_args["hours"] = args.hours 483 | tool_args = add_aws_config_args(tool_args, args) 484 | result = await session.call_tool( 485 | "find_error_patterns", 486 | arguments=tool_args, 487 | ) 488 | print_json_response(result) 489 | 490 | elif args.command == "correlate": 491 | tool_args = { 492 | "log_group_names": args.log_group_names, 493 | "search_term": args.search_term, 494 | } 495 | if args.start_time: 496 | tool_args["start_time"] = args.start_time 497 | if args.end_time: 498 | tool_args["end_time"] = args.end_time 499 | if not (args.start_time or args.end_time): 500 | tool_args["hours"] = args.hours 501 | tool_args = add_aws_config_args(tool_args, args) 502 | result = await session.call_tool( 503 | "correlate_logs", 504 | arguments=tool_args, 505 | ) 506 | print_json_response(result) 507 | 508 | except Exception as e: 509 | print(f"Error: {str(e)}", file=sys.stderr) 510 | sys.exit(1) 511 | 512 | 513 | def print_json_response(content: str | tuple | object | None): 514 | """Print JSON content in a formatted way. 515 | 516 | Args: 517 | content: The content to print, which could be: 518 | - String (direct JSON content) 519 | - Tuple (from read_resource, where the first element is the content) 520 | - Object with .content or .text attributes (from CallToolResult) 521 | - None 522 | """ 523 | try: 524 | # Handle None case 525 | if content is None: 526 | print("No content received.") 527 | return 528 | 529 | # For Session.read_resource responses, which returns tuple (meta, content) 530 | # but we found that sometimes content is None 531 | if isinstance(content, tuple): 532 | meta, content_text = ( 533 | content 534 | if len(content) >= 2 535 | else (content[0] if len(content) == 1 else None, None) 536 | ) 537 | 538 | # If we have usable content in the second element, use it 539 | if content_text is not None: 540 | content = content_text 541 | # Otherwise, if meta looks usable, try that 542 | elif isinstance(meta, str) and meta != "meta": 543 | content = meta 544 | # We don't have usable content in the tuple 545 | else: 546 | print("No usable content found in the response.") 547 | return 548 | 549 | # Handle object with content attribute (from CallToolResult) 550 | if hasattr(content, "content"): 551 | content = content.content 552 | 553 | # Handle object with text attribute 554 | if hasattr(content, "text"): 555 | content = content.text 556 | 557 | # Handle CallToolResult content from mcp_types which can be a list 558 | if isinstance(content, list) and all(hasattr(item, "text") for item in content): 559 | # Extract text from each item 560 | extracted_texts = [item.text for item in content if item.text] 561 | if extracted_texts: 562 | content = extracted_texts[0] # Use the first text element 563 | 564 | # Handle if content is a custom object with __str__ method 565 | if not isinstance(content, (str, bytes, bytearray)) and hasattr( 566 | content, "__str__" 567 | ): 568 | content = str(content) 569 | 570 | # Try to handle various formats 571 | if isinstance(content, str): 572 | try: 573 | # Try to parse as JSON 574 | parsed = json.loads(content) 575 | print(json.dumps(parsed, indent=2)) 576 | except json.JSONDecodeError: 577 | # Not valid JSON, just print the string 578 | print(content) 579 | elif isinstance(content, (dict, list)): 580 | # Direct Python objects 581 | print(json.dumps(content, indent=2, default=lambda x: str(x))) 582 | else: 583 | # Fall back to string representation 584 | print(content) 585 | 586 | except Exception as e: 587 | # Catch-all for any unexpected errors 588 | print(f"Error processing response: {e}") 589 | print(content) 590 | 591 | 592 | if __name__ == "__main__": 593 | asyncio.run(main()) 594 | ```