#
tokens: 23324/50000 25/25 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .github
│   ├── pull_request_template.md
│   └── workflows
│       └── pre-commit.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .python-version
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docs
│   ├── ai-integration.md
│   ├── architecture.md
│   ├── assets
│   │   ├── claude-desktop-settings.png
│   │   ├── claude-server-disconnected.png
│   │   └── Log-Analyzer-with-MCP-arch.png
│   ├── aws-config.md
│   ├── features.md
│   ├── troubleshooting.md
│   └── usage.md
├── LICENSE
├── NOTICE
├── pyproject.toml
├── README.md
├── src
│   ├── client.py
│   └── cw-mcp-server
│       ├── __init__.py
│       ├── resources
│       │   ├── __init__.py
│       │   └── cloudwatch_logs_resource.py
│       ├── server.py
│       └── tools
│           ├── __init__.py
│           ├── analysis_tools.py
│           ├── correlation_tools.py
│           ├── search_tools.py
│           └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.11.10
    hooks:
      - id: ruff-check
        args: [--fix]
      - id: ruff-format

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[co]
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# IDE
.idea/
.vscode/
*.swp
*.swo
*~

# Testing
.coverage
htmlcov/
.tox/
.nox/
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# ruff
.ruff_cache/

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# OS specific
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Logs
*.log
logs/
log/

# Local development
.env
.env.local
.env.*.local

# Misc
*.bak
*.tmp
*.temp
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Log Analyzer with MCP

A [Model Context Protocol (MCP)](https://modelcontextprotocol.io) server that provides AI assistants access to AWS CloudWatch Logs for analysis, searching, and correlation.

## 🏗️ Architecture
![Architecture Diagram](./docs/assets/Log-Analyzer-with-MCP-arch.png)

## 🔌 Model Context Protocol (MCP)

As outlined by Anthropic:
> MCP is an open protocol that standardizes how applications provide context to LLMs. Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect your devices to various peripherals and accessories, MCP provides a standardized way to connect AI models to different data sources and tools.

This repository is an example client and server that allows an AI assistant like Claude to interact with CloudWatch logs in an AWS account. To learn more about MCP, read through the [introduction](https://modelcontextprotocol.io/introduction). 

## ✨ Features

- Browse and search CloudWatch Log Groups
- Search logs using CloudWatch Logs Insights query syntax
- Generate log summaries and identify error patterns
- Correlate logs across multiple AWS services
- AI-optimized tools for assistants like Claude

[Detailed feature list](./docs/features.md)

## 🚀 Installation

### Prerequisites

- The [uv](https://github.com/astral-sh/uv) Python package and project manager
- An AWS account with CloudWatch Logs
- Configured [AWS credentials](./docs/aws-config.md)


### Setup

```bash
# Clone the repository
git clone https://github.com/awslabs/Log-Analyzer-with-MCP.git
cd Log-Analyzer-with-MCP

# Create a virtual environment and install dependencies
uv sync
source .venv/bin/activate  # On Windows, use `.venv\Scripts\activate`
```

## 🚦 Quick Start

1. Make sure to have configured your AWS credentials as [described here](./docs/aws-config.md)

2. Update your `claude_desktop_config.json` file with the proper configuration outlined in the [AI integration guide](./docs/ai-integration.md)

3. Open Claude for Desktop and start chatting!

For more examples and advanced usage, see the [detailed usage guide](./docs/usage.md).

## 🤖 AI Integration

This project can be easily integrated with AI assistants like Claude for Desktop. See the [AI integration guide](./docs/ai-integration.md) for details.

## 📚 Documentation

- [Detailed Features](./docs/features.md)
- [Usage Guide](./docs/usage.md)
- [AWS Configuration](./docs/aws-config.md)
- [Architecture Details](./docs/architecture.md)
- [AI Integration](./docs/ai-integration.md)
- [Troubleshooting](./docs/troubleshooting.md)

## 🔒 Security

See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.

## 📄 License

This project is licensed under the Apache-2.0 License.
```

--------------------------------------------------------------------------------
/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------

```markdown
## Code of Conduct
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
[email protected] with any additional questions or comments.

```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing Guidelines

Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional
documentation, we greatly value feedback and contributions from our community.

Please read through this document before submitting any issues or pull requests to ensure we have all the necessary
information to effectively respond to your bug report or contribution.


## Reporting Bugs/Feature Requests

We welcome you to use the GitHub issue tracker to report bugs or suggest features.

When filing an issue, please check existing open, or recently closed, issues to make sure somebody else hasn't already
reported the issue. Please try to include as much information as you can. Details like these are incredibly useful:

* A reproducible test case or series of steps
* The version of our code being used
* Any modifications you've made relevant to the bug
* Anything unusual about your environment or deployment


## Contributing via Pull Requests
Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that:

1. You are working against the latest source on the *main* branch.
2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already.
3. You open an issue to discuss any significant work - we would hate for your time to be wasted.

To send us a pull request, please:

1. Fork the repository.
2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change.
3. Ensure local tests pass.
4. Commit to your fork using clear commit messages.
5. Send us a pull request, answering any default questions in the pull request interface.
6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.

GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and
[creating a pull request](https://help.github.com/articles/creating-a-pull-request/).


## Finding contributions to work on
Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels (enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any 'help wanted' issues is a great place to start.


## Code of Conduct
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
[email protected] with any additional questions or comments.


## Security issue notifications
If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue.


## Licensing

See the [LICENSE](LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution.

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/resources/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "Log-Analyzer-with-MCP"
version = "0.1.0"
description = "An MCP server that provides AI assistants access to CloudWatch Logs"
readme = "README.md"
requires-python = ">=3.12"
authors = [
  {name = "Aditya Addepalli", email = "[email protected]"},
]
dependencies = [
    "boto3>=1.37.11",
    "mcp[cli]>=1.6.0",
]

[dependency-groups]
dev = [
    "ruff>=0.11.10",
]

```

--------------------------------------------------------------------------------
/.github/workflows/pre-commit.yml:
--------------------------------------------------------------------------------

```yaml
name: pre-commit

on:
  pull_request:
  push:
    branches: [main]
permissions: {}
jobs:
  pre-commmit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
          cache: "pip"
      - name: pre-commit setup
        run: pip install pre-commit
      - name: run linter
        run: pre-commit run --all-files

```

--------------------------------------------------------------------------------
/docs/features.md:
--------------------------------------------------------------------------------

```markdown
# Detailed Features

## 🔍 Log Group Discovery
- Browse available CloudWatch Log Groups
- Filter log groups by name prefix
- Paginate through large sets of log groups

## 🔎 Search Capabilities
- Search for specific error patterns or anomalies in CloudWatch Logs
- Apply CloudWatch Logs Insights query syntax for powerful filtering
- Limit results to specific time ranges

## 📊 Log Analysis
- Generate summaries of log activity over time
- Identify common error patterns and exceptions
- Analyze log structure and format automatically
- View frequency of different error types

## 🔄 Cross-Service Correlation
- Correlate logs across multiple AWS services
- Track request flows through distributed systems
- Identify related events across different log groups

## 🤖 AI-Optimized Tools
- Direct tool access for AI assistants
- Specialized prompts for guided log analysis
- Structured data responses for easy consumption 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/__init__.py:
--------------------------------------------------------------------------------

```python
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import functools
import json
import traceback
from typing import Callable


def handle_exceptions(func: Callable) -> Callable:
    """
    Decorator for handling exceptions in tool methods.
    Ensures that all tool methods return a standardized error response
    rather than raising exceptions that would cause the client to fail.
    """

    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            error_traceback = traceback.format_exc()
            error_response = {
                "status": "Error",
                "error": str(e),
                "error_type": e.__class__.__name__,
                "details": error_traceback.split("\n")[-2] if error_traceback else None,
            }
            return json.dumps(error_response, indent=2)

    return wrapper

```

--------------------------------------------------------------------------------
/.github/pull_request_template.md:
--------------------------------------------------------------------------------

```markdown
<!-- markdownlint-disable MD041 MD043 -->
**Issue number:**

## Summary

### Changes

> Please provide a summary of what's being changed

### User experience

> Please share what the user experience looks like before and after this change

## Checklist

If your change doesn't seem to apply, please leave them unchecked.

* [ ] I have reviewed the [contributing guidelines](https://github.com/awslabs/Log-Analyzer-with-MCP/blob/main/CONTRIBUTING.md)
* [ ] I have performed a self-review of this change
* [ ] Changes have been tested
* [ ] Changes are documented

<details>
<summary>Is this a breaking change?</summary>

**RFC issue number**:

Checklist:

* [ ] Migration process documented
* [ ] Implement warnings (if it can live side by side)

</details>

## Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of the [project license](https://github.com/awslabs/Log-Analyzer-with-MCP/blob/main/LICENSE).

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/utils.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from datetime import datetime, timedelta
import dateutil.parser


def get_time_range(hours: int, start_time: str = None, end_time: str = None):
    """
    Calculate time range timestamps from hours or exact start/end times.

    Args:
        hours: Number of hours to look back (used if start_time is not provided)
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time

    Returns:
        Tuple of (start_timestamp, end_timestamp) in milliseconds since epoch
    """
    if start_time:
        start_ts = int(dateutil.parser.isoparse(start_time).timestamp() * 1000)
    else:
        start_ts = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)

    if end_time:
        end_ts = int(dateutil.parser.isoparse(end_time).timestamp() * 1000)
    else:
        end_ts = int(datetime.now().timestamp() * 1000)

    return start_ts, end_ts

```

--------------------------------------------------------------------------------
/docs/architecture.md:
--------------------------------------------------------------------------------

```markdown
# 🏗️ Architecture Details

The following diagram illustrates the high-level architecture of the MCP CloudWatch Log Analyzer:

![Architecture Diagram](../docs/assets/Log-Analyzer-with-MCP-arch.png)

The architecture consists of three main components:

## 💻 Client Side
- AWS Credentials are configured on the client machine
- The local computer runs the MCP client applications
- MCP Server runs locally and manages the communication

## ☁️ AWS Cloud
- CloudWatch service provides the log data and search capabilities

## 🔄 Data Flow
- AWS Credentials flow from configuration to the client
- The client communicates with CloudWatch through the MCP Server
- The MCP Server mediates all interactions with AWS services

The project follows the Model Context Protocol architecture:

## 📚 Resources
Expose CloudWatch log groups, streams, and events as addressable URIs
- `logs://groups` - List all log groups
- `logs://groups/{log_group_name}/streams` - List streams for a specific group
- `logs://groups/{log_group_name}/streams/{log_stream_name}` - Get events from a stream

## 🧰 Tools
Provide functionality for log analysis, search, and correlation
- `list_log_groups` - List and filter available log groups
- `search_logs` - Search logs with CloudWatch Insights queries
- `summarize_log_activity` - Generate time-based activity summaries
- `find_error_patterns` - Identify common error patterns
- `correlate_logs` - Find related events across multiple log groups

## 💬 Prompts
Guide AI assistants through common workflows
- `list_cloudwatch_log_groups` - Help explore available log groups
- `analyze_cloudwatch_logs` - Guide log analysis process

## 🖥️ Server
Handles MCP protocol communication and AWS API integration
- Manages API access to CloudWatch Logs
- Handles asynchronous CloudWatch Logs Insights queries
- Provides structured data responses

## 📱 Client
Command-line interface for interacting with the server
- Parses commands and arguments
- Connects to the server via stdio
- Formats JSON responses for human readability 
```

--------------------------------------------------------------------------------
/docs/aws-config.md:
--------------------------------------------------------------------------------

```markdown
# 🔐 AWS Configuration Guide

For the MCP server to access your AWS CloudWatch Logs, you need to configure AWS credentials, which you can learn how to do [here](https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-files.html). The server uses boto3's credential resolution chain, which checks several locations in the following order:

1. **Environment variables**:
   ```bash
   export AWS_ACCESS_KEY_ID="your-access-key"
   export AWS_SECRET_ACCESS_KEY="your-secret-key"
   export AWS_REGION="us-east-1"
   ```

2. **Shared credential file** (`~/.aws/credentials`):
   ```ini
   [default]
   aws_access_key_id = your-access-key
   aws_secret_access_key = your-secret-key
   ```
   
   If you're seeing errors like `An error occurred (AccessDenied) when calling the DescribeLogGroups operation: Access denied`, make sure to add your credentials in this format:
   ```ini
   [default]
   aws_access_key_id = your-access-key
   aws_secret_access_key = your-secret-key
   
   # For temporary credentials, add the session token
   [temp-profile]
   aws_access_key_id = your-temp-access-key
   aws_secret_access_key = your-temp-secret-key
   aws_session_token = your-session-token
   ```

   Check out the [troubleshooting guide](./troubleshooting.md) for more information.

3. **AWS config file** (`~/.aws/config`):
   ```ini
   [default]
   region = us-east-1
   ```

You can set up your AWS credentials using the AWS CLI:

```bash
aws configure
```

## Using a Specific AWS Profile or Region

1. **Server Start-up**

   If you have multiple AWS profiles or want to specify a region, use:
   
   ```bash
   python src/cw-mcp-server/server.py --profile your-profile-name --region us-west-2
   ```

2. **Per-Call Override**

   Override the profile or region on individual AI prompts or tool calls:
   
   > Example: Get a list of CloudWatch log groups using the "dev-account" profile in "eu-central-1" region.

   Once you set a profile or region, the LLM keeps using it for follow-ups. Only specify a new profile or region when you need to switch accounts or regions.

This is useful when you need to access CloudWatch logs in different AWS accounts or regions.

## 🛡️ Required Permissions

The MCP server requires permissions to access CloudWatch Logs. At minimum, ensure your IAM user or role has the following policies:
- `CloudWatchLogsReadOnlyAccess`

```

--------------------------------------------------------------------------------
/docs/troubleshooting.md:
--------------------------------------------------------------------------------

```markdown
# 🔧 Troubleshooting Guide

There may be various issues you can run into while setting this up. Here are some tips on troubleshooting:

## ⚠️ Common Issues

**Server Disconnected**:
```
MCP cw-mcp-server: Server Disconnected.
```
1. Ensure your json file in the [ai integration guide](./ai-integration.md) is configured properly.
2. Ensure you've set up your AWS credentials properly according to the [aws configuration](./aws-config.md)

**Authentication Errors**:
```
Error: An error occurred (AccessDenied) when calling the DescribeLogGroups operation: Access denied
```
Ensure your AWS credentials are properly configured and have the necessary permissions to access CloudWatch Logs:

1. Check if your credentials file exists:
   ```bash
   cat ~/.aws/credentials
   ```

2. Verify you have the required permissions (CloudWatchLogsReadOnlyAccess) for the assumed role you are using.

3. If using temporary credentials, ensure your session token is included in your `~/.aws/credentials` file:
   ```ini
   [profile-name]
   aws_access_key_id = your-temp-access-key
   aws_secret_access_key = your-temp-secret-key
   aws_session_token = your-session-token
   ```

4. Test your credentials directly with AWS CLI:
   ```bash
   aws cloudwatch list-metrics --namespace AWS/Logs
   ```

**Resource Not Found**:
```
Error: An error occurred (ResourceNotFoundException) when calling the GetLogEvents operation
```
Check that the log group and stream names are correct. Log stream names are case sensitive.

**Connection Issues**:
```
Error: Failed to connect to MCP server
```
Verify that the server is running and accessible. Check file paths in your `claude_desktop_config.json` or client configuration.

**Query Timeout**:
```
"status": "Timeout", "error": "Search query failed to complete"
```
For complex queries or large log volumes, try reducing the time range using the `--hours` parameter.

**Claude terminating request**:

This could be due to a query timeout or invalid response from CloudWatch (for example: performing operations on a non-existent log group). 

In this case, try checking the server logs as outlined in the settings

![Claude Desktop Settings](./assets/claude-desktop-settings.png)

then, click on the `Open Logs Folder` and open the `mcp-server-cw-mcp-server.log` file to see more details. 

**Amazon Q CLI terminating request**:

if you have issues with your configuration, then Amazon Q CLI will start (without any MCP Server Tools) with an error similar to:
```
WARNING: Error reading global mcp config: expected value at line 9 column 19
Please check to make sure config is correct. Discarding.
```
You might also see timeout issues if it is struggling to find and download the details you have configured in your mcp.json, for example:
```
x mcp_server has failed to load:
- Operation timed out: recv for initialization
- run with Q_LOG_LEVEL=trace and see $TMPDIR/qlog for detail
x 0 of 1 mcp servers initialized
```
You can go to `$TMPDIR/qlog` to find various logs generated, and you can configure Q_LOG_LEVEL with either trace, debug, info, and warn configurations to get debug output to help you troubleshoot any issues you might run into.


## 🆘 Getting Help

If you encounter issues not covered in this troubleshooting section, please:

1. Check the server logs for detailed error messages
2. Verify your AWS permissions and configuration

If you're still facing issues, please open a GitHub Issue.

```

--------------------------------------------------------------------------------
/docs/ai-integration.md:
--------------------------------------------------------------------------------

```markdown
# AI Integration Guide

## 🖥️ Claude Desktop Integration

You can add the configuration for the MCP server in Claude for Desktop for AI-assisted log analysis.

To get Claude for Desktop and how to add an MCP server, access [this link](https://modelcontextprotocol.io/quickstart/user). Add this to your respective json file:

```json
{
  "mcpServers": {
    "cw-mcp-server": {
      "command": "uv",
      "args": [
        "--directory",
        "/path/to/Log-Analyzer-with-MCP/src/cw-mcp-server",
        "run",
        "server.py"
        // You can add "--profile", "your-profile" and/or "--region", "us-west-2" here if needed but it will pull it from your AWS credentials as well
      ]
    }
  },
}
```

## 🤖 Amazon Q CLI Integration

Amazon Q CLI acts as an MCP Client. To connect to MCP Servers and access the tools they surface, you need to create a configuration file called `mcp.json` in your Amazon Q configuration directory.

Your directory structure should look like this:

```bash
~/.aws
└── amazonq
    ├── mcp.json
    ├── profiles
    ├── cache
    ├── history
    └── prompts
```

If `mcp.json` is empty, edit it to add this to your MCP Server configuration file:

```json
{
  "mcpServers": {
    "cw-mcp-server": {
      "command": "uv",
      "args": [
        "--directory",
        "/path/to/Log-Analyzer-with-MCP/src/cw-mcp-server",
        "run",
        "server.py"
        // Optionally add "--profile", "your-profile" and/or "--region", "us-west-2" here if needed but it will pull it from your AWS credentials as well
      ]
    }
  }
}
```

### Testing the configuration
Every time you start Amazon Q CLI, it will attempt to load any configured MCP Servers. You should see output indicating that the MCP Server has been discovered and initialized.

![image](https://github.com/user-attachments/assets/9acc1632-5a9a-4465-9fdc-a8464640f6a6)

If you're running into issues, check out the [troubleshooting guide](./troubleshooting.md) or open a GitHub Issue. 

## 🔍 AI Assistant Capabilities

With the enhanced tool support, AI assistants can now:

1. **Discover Log Groups**:
   - "Show me all my CloudWatch log groups"
   - "List log groups that start with /aws/lambda"
   - "Show me the next page of log groups"

2. **Understand Log Structure**:
   - "Analyze the structure of my API Gateway logs"
   - "What fields are common in these JSON logs?"
   - "Show me a sample of recent logs from this group"

3. **Diagnose Issues**:
   - "Find all errors in my Lambda logs from the past 24 hours"
   - "What's the most common error pattern in this log group?"
   - "Show me logs around the time this service crashed"

4. **Perform Analysis**:
   - "Compare log volumes between these three services"
   - "Find correlations between errors in my database and API logs"
   - "Analyze the trend of timeouts in my Lambda function"

> You can specify a different AWS profile or region in your prompt, e.g. "Show me all my CloudWatch log groups using <profile_name> profile in <region> region"

## 💬 AI Prompt Templates

The server provides specialized prompts that AI assistants can use:

1. **List and Explore Log Groups Prompt**:
   ```
   I'll help you explore the CloudWatch log groups in your AWS environment.
   First, I'll list the available log groups...
   ```

2. **Log Analysis Prompt**:
   ```
   Please analyze the following CloudWatch logs from the {log_group_name} log group.
   First, I'll get you some information about the log group...
   ```
3. **Profile/Region Override**:
   ```
   I'll help you list CloudWatch log groups using the <profile_name> profile in the <region> region. Let me do that for you:
   ```

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/correlation_tools.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import asyncio
import boto3
import json
import time
from datetime import datetime
from typing import List

from . import handle_exceptions
from .utils import get_time_range


class CloudWatchLogsCorrelationTools:
    """Tools for correlating logs across multiple CloudWatch Log groups."""

    def __init__(self, profile_name=None, region_name=None):
        """Initialize the CloudWatch Logs client.

        Args:
            profile_name: Optional AWS profile name to use for credentials
            region_name: Optional AWS region name to use for API calls
        """
        # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
        self.profile_name = profile_name
        self.region_name = region_name
        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        self.logs_client = session.client("logs")

    @handle_exceptions
    async def correlate_logs(
        self,
        log_group_names: List[str],
        search_term: str,
        hours: int = 24,
        start_time: str = None,
        end_time: str = None,
    ) -> str:
        """
        Correlate logs across multiple AWS services using a common search term.

        Args:
            log_group_names: List of log group names to search
            search_term: Term to search for in logs (request ID, transaction ID, etc.)
            hours: Number of hours to look back
            start_time: Start time in ISO8601 format
            end_time: End time in ISO8601 format

        Returns:
            JSON string with correlated events
        """
        start_ts, end_ts = get_time_range(hours, start_time, end_time)

        # Validate inputs
        if not log_group_names:
            return json.dumps(
                {"status": "Error", "error": "No log groups specified"}, indent=2
            )

        if not search_term:
            return json.dumps(
                {"status": "Error", "error": "No search term specified"}, indent=2
            )

        # Results dictionary
        results = {
            "timeRange": {
                "start": datetime.fromtimestamp(start_ts / 1000).isoformat(),
                "end": datetime.fromtimestamp(end_ts / 1000).isoformat(),
                "hours": hours,
            },
            "searchTerm": search_term,
            "logGroups": {},
            "correlatedEvents": [],
        }

        # Get relevant logs from each group
        for log_group_name in log_group_names:
            # Use CloudWatch Logs Insights query
            query = f"""
            filter @message like "{search_term}"
            | sort @timestamp asc
            | limit 100
            """

            # Start the query
            query_start_time = time.time()
            start_query_response = self.logs_client.start_query(
                logGroupName=log_group_name,
                startTime=start_ts,
                endTime=end_ts,
                queryString=query,
            )

            query_id = start_query_response["queryId"]

            # Poll for query results
            response = None
            while response is None or response["status"] == "Running":
                await asyncio.sleep(1)  # Wait before checking again
                response = self.logs_client.get_query_results(queryId=query_id)

                # Avoid long-running queries
                if response["status"] == "Running":
                    # Check if we've been running too long (60 seconds)
                    if time.time() - query_start_time > 60:
                        response = {"status": "Timeout", "results": []}
                        break

            # Process results for this log group
            log_group_events = []

            for result in response.get("results", []):
                event = {"logGroup": log_group_name, "timestamp": None, "message": None}

                for field in result:
                    if field["field"] == "@timestamp":
                        event["timestamp"] = field["value"]
                    elif field["field"] == "@message":
                        event["message"] = field["value"]
                    elif field["field"] == "@logStream":
                        event["logStream"] = field["value"]

                if event["timestamp"] and event["message"]:
                    log_group_events.append(event)
                    results["correlatedEvents"].append(event)

            # Store events for this log group
            results["logGroups"][log_group_name] = {
                "eventCount": len(log_group_events),
                "events": log_group_events,
            }

        # Sort all correlated events by timestamp
        results["correlatedEvents"] = sorted(
            results["correlatedEvents"], key=lambda x: x.get("timestamp", "")
        )

        return json.dumps(results, indent=2)

```

--------------------------------------------------------------------------------
/docs/usage.md:
--------------------------------------------------------------------------------

```markdown
# Detailed Usage Guide

## 🌐 Integrated with MCP clients (Claude for Desktop, Cursor, Windsurf, etc.) - recommended way for usage

AI assistants can leverage this MCP server. To understand more check out the [AI Integration Guide](./ai-integration.md)

## 🖥️ Standalone Server directly

The MCP server exposes CloudWatch logs data and analysis tools to AI assistants and MCP clients:

```bash
python src/cw-mcp-server/server.py [--profile your-profile] [--region us-west-2]
```

The server runs in the foreground by default. To run it in the background, you can use:

```bash
python src/cw-mcp-server/server.py &
```

[Amazon Bedrock AgentCore requires stateless streamable-HTTP servers](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-mcp.html#runtime-mcp-how-it-works) because the Runtime provides session isolation by default. The platform automatically adds a `Mcp-Session-Id` header for any request without it, so MCP clients can maintain connection continuity to the same Amazon Bedrock AgentCore Runtime session.

The server runs in stateful mode by default. To run it in stateless mode, you can use:

```bash
python src/cw-mcp-server/server.py [--profile your-profile] [--region us-west-2] --stateless
```

## 📟 CLI Client (one off usage)

The project includes a command-line client for interacting with the MCP server:

```bash
# List available log groups
python src/client.py list-groups [--profile your-profile] [--region us-west-2]

# List log groups with a prefix filter
python src/client.py list-groups --prefix "/aws/lambda" [--region us-west-2]

# Use the tool interface instead of resource
python src/client.py list-groups --use-tool [--region us-west-2]

# Get a prompt for exploring log groups
python src/client.py list-prompt [--region us-west-2]

# List log streams in a specific log group
python src/client.py list-streams "/aws/lambda/my-function" [--region us-west-2]

# Get log events from a specific stream
python src/client.py get-events "/aws/lambda/my-function" "2023/06/01/[$LATEST]abcdef123456" [--region us-west-2]

# Get a sample of recent logs
python src/client.py sample "/aws/lambda/my-function" [--region us-west-2]

# Get recent errors
python src/client.py recent-errors "/aws/lambda/my-function" [--region us-west-2]

# Get log structure analysis
python src/client.py structure "/aws/lambda/my-function" [--region us-west-2]

# Search logs for a specific pattern
python src/client.py search "/aws/lambda/my-function" "filter @message like 'error'" [--region us-west-2]

# Generate a summary of log activity
python src/client.py summarize "/aws/lambda/my-function" --hours 48 [--region us-west-2]

# Find common error patterns
python src/client.py find-errors "/aws/lambda/my-function" [--region us-west-2]

# Correlate logs across multiple services
python src/client.py correlate "/aws/lambda/service1" "/aws/lambda/service2" "OrderId: 12345" [--region us-west-2]
```

*You can use --profile and --region with any command to target a specific AWS account or region.*

## 🧩 Example Workflows

### Finding and analyzing errors in a Lambda function using the standalone server directly

```bash
# 1. List your log groups to find the Lambda function
python src/client.py list-groups --prefix "/aws/lambda" [--region us-west-2]

# 2. Generate a summary to see when errors occurred
python src/client.py summarize "/aws/lambda/my-function" --hours 24 [--region us-west-2]

# 3. Find the most common error patterns
python src/client.py find-errors "/aws/lambda/my-function" [--region us-west-2]

# 4. Search for details about a specific error
python src/client.py search "/aws/lambda/my-function" "filter @message like 'ConnectionError'" [--region us-west-2]
```

### Correlating requests across microservices using the standalone server directly

```bash
# Track a request ID across multiple services
python src/client.py correlate \
  "/aws/lambda/api-gateway" \
  "/aws/lambda/auth-service" \
  "/aws/lambda/payment-processor" \
  "req-abc123" [--region us-west-2]
```

## 🔗 Resource URIs

The MCP server exposes CloudWatch Logs data through the following resource URIs:

| Resource URI | Description |
|--------------|-------------|
| `logs://groups` | List all log groups |
| `logs://groups/filter/{prefix}` | List log groups filtered by prefix |
| `logs://groups/{log_group_name}` | Get details about a specific log group |
| `logs://groups/{log_group_name}/streams` | List streams for a log group |
| `logs://groups/{log_group_name}/streams/{log_stream_name}` | Get events from a specific log stream |
| `logs://groups/{log_group_name}/sample` | Get a sample of recent logs |
| `logs://groups/{log_group_name}/recent-errors` | Get recent errors from a log group |
| `logs://groups/{log_group_name}/metrics` | Get log metrics (volume, frequency) |
| `logs://groups/{log_group_name}/structure` | Analyze log format and structure |

## 🧰 Tool Handlers

The server provides the following tool handlers for AI assistants:

| Tool | Description |
|------|-------------|
| `list_log_groups` | List available CloudWatch log groups with filtering options |
| `search_logs` | Execute CloudWatch Logs Insights queries on a single log group |
| `search_logs_multi` | Execute CloudWatch Logs Insights queries across multiple log groups |
| `filter_log_events` | Filter logs by pattern across all streams |
| `summarize_log_activity` | Generate time-based activity summaries |
| `find_error_patterns` | Identify common error patterns |
| `correlate_logs` | Find related events across multiple log groups | 
```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/search_tools.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import asyncio
import boto3
import json
import time
from datetime import datetime
from typing import List

from . import handle_exceptions
from .utils import get_time_range


class CloudWatchLogsSearchTools:
    """Tools for searching and querying CloudWatch Logs."""

    def __init__(self, profile_name=None, region_name=None):
        """Initialize the CloudWatch Logs client.

        Args:
            profile_name: Optional AWS profile name to use for credentials
            region_name: Optional AWS region name to use for API calls
        """
        # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
        self.profile_name = profile_name
        self.region_name = region_name
        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        self.logs_client = session.client("logs")

    @handle_exceptions
    async def search_logs(
        self,
        log_group_name: str,
        query: str,
        hours: int = 24,
        start_time: str = None,
        end_time: str = None,
    ) -> str:
        """
        Search logs using CloudWatch Logs Insights query.

        Args:
            log_group_name: The log group to search
            query: CloudWatch Logs Insights query syntax
            hours: Number of hours to look back
            start_time: Start time in ISO8601 format
            end_time: End time in ISO8601 format

        Returns:
            JSON string with search results
        """
        return await self.search_logs_multi(
            [log_group_name], query, hours, start_time, end_time
        )

    @handle_exceptions
    async def search_logs_multi(
        self,
        log_group_names: List[str],
        query: str,
        hours: int = 24,
        start_time: str = None,
        end_time: str = None,
    ) -> str:
        """
        Search logs across multiple log groups using CloudWatch Logs Insights query.

        Args:
            log_group_names: List of log groups to search
            query: CloudWatch Logs Insights query syntax
            hours: Number of hours to look back
            start_time: Start time in ISO8601 format
            end_time: End time in ISO8601 format

        Returns:
            JSON string with search results
        """
        start_ts, end_ts = get_time_range(hours, start_time, end_time)
        # Start the query
        query_start_time = time.time()
        start_query_response = self.logs_client.start_query(
            logGroupNames=log_group_names,
            startTime=start_ts,
            endTime=end_ts,
            queryString=query,
            limit=100,
        )
        query_id = start_query_response["queryId"]

        # Poll for query results
        response = None
        while response is None or response["status"] == "Running":
            await asyncio.sleep(1)  # Wait before checking again
            response = self.logs_client.get_query_results(queryId=query_id)
            elapsed_time = time.time() - query_start_time

            # Avoid long-running queries
            if response["status"] == "Running":
                # Check if we've been running too long (60 seconds)
                if elapsed_time > 60:
                    return json.dumps(
                        {
                            "status": "Timeout",
                            "error": "Search query failed to complete within time limit",
                        },
                        indent=2,
                    )

        # Process and format the results
        formatted_results = {
            "status": response["status"],
            "statistics": response.get("statistics", {}),
            "searchedLogGroups": log_group_names,
            "results": [],
        }

        for result in response.get("results", []):
            result_dict = {}
            for field in result:
                result_dict[field["field"]] = field["value"]
            formatted_results["results"].append(result_dict)

        return json.dumps(formatted_results, indent=2)

    @handle_exceptions
    async def filter_log_events(
        self,
        log_group_name: str,
        filter_pattern: str,
        hours: int = 24,
        start_time: str = None,
        end_time: str = None,
    ) -> str:
        """
        Filter log events by pattern across all streams in a log group.

        Args:
            log_group_name: The log group to filter
            filter_pattern: The pattern to search for (CloudWatch Logs filter syntax)
            hours: Number of hours to look back
            start_time: Start time in ISO8601 format
            end_time: End time in ISO8601 format

        Returns:
            JSON string with filtered events
        """
        start_ts, end_ts = get_time_range(hours, start_time, end_time)
        response = self.logs_client.filter_log_events(
            logGroupName=log_group_name,
            filterPattern=filter_pattern,
            startTime=start_ts,
            endTime=end_ts,
            limit=100,
        )

        events = response.get("events", [])
        formatted_events = []

        for event in events:
            formatted_events.append(
                {
                    "timestamp": datetime.fromtimestamp(
                        event.get("timestamp", 0) / 1000
                    ).isoformat(),
                    "message": event.get("message"),
                    "logStreamName": event.get("logStreamName"),
                }
            )

        return json.dumps(formatted_events, indent=2)

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/tools/analysis_tools.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import asyncio
import boto3
import json
from datetime import datetime

from . import handle_exceptions
from .utils import get_time_range


class CloudWatchLogsAnalysisTools:
    """Tools for analyzing CloudWatch Logs data."""

    def __init__(self, profile_name=None, region_name=None):
        """Initialize the CloudWatch Logs client.

        Args:
            profile_name: Optional AWS profile name to use for credentials
            region_name: Optional AWS region name to use for API calls
        """
        # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
        self.profile_name = profile_name
        self.region_name = region_name
        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        self.logs_client = session.client("logs")

    @handle_exceptions
    async def summarize_log_activity(
        self,
        log_group_name: str,
        hours: int = 24,
        start_time: str = None,
        end_time: str = None,
    ) -> str:
        """
        Generate a summary of log activity over a specified time period.

        Args:
            log_group_name: The log group to analyze
            hours: Number of hours to look back
            start_time: Start time in ISO8601 format
            end_time: End time in ISO8601 format

        Returns:
            JSON string with activity summary
        """
        start_ts, end_ts = get_time_range(hours, start_time, end_time)

        # Use CloudWatch Logs Insights to get a summary
        query = """
        stats count(*) as logEvents,
              count_distinct(stream) as streams
        | sort @timestamp desc
        | limit 1000
        """

        # Start the query
        start_query_response = self.logs_client.start_query(
            logGroupName=log_group_name,
            startTime=start_ts,
            endTime=end_ts,
            queryString=query,
        )

        query_id = start_query_response["queryId"]

        # Poll for query results
        response = None
        while response is None or response["status"] == "Running":
            await asyncio.sleep(1)  # Wait before checking again
            response = self.logs_client.get_query_results(queryId=query_id)

        # Get the hourly distribution
        hourly_query = """
        stats count(*) as count by bin(1h)
        | sort @timestamp desc
        | limit 24
        """

        # Start the hourly query
        hourly_query_response = self.logs_client.start_query(
            logGroupName=log_group_name,
            startTime=start_ts,
            endTime=end_ts,
            queryString=hourly_query,
        )

        hourly_query_id = hourly_query_response["queryId"]

        # Poll for hourly query results
        hourly_response = None
        while hourly_response is None or hourly_response["status"] == "Running":
            await asyncio.sleep(1)  # Wait before checking again
            hourly_response = self.logs_client.get_query_results(
                queryId=hourly_query_id
            )

        # Process the main summary results
        summary = {
            "timeRange": {
                "start": datetime.fromtimestamp(start_ts / 1000).isoformat(),
                "end": datetime.fromtimestamp(end_ts / 1000).isoformat(),
                "hours": hours,
            },
            "logEvents": 0,
            "uniqueStreams": 0,
            "hourlyDistribution": [],
        }

        # Extract the main stats
        for result in response.get("results", []):
            for field in result:
                if field["field"] == "logEvents":
                    summary["logEvents"] = int(field["value"])
                elif field["field"] == "streams":
                    summary["uniqueStreams"] = int(field["value"])

        # Extract the hourly distribution
        for result in hourly_response.get("results", []):
            hour_data = {}
            for field in result:
                if field["field"] == "bin(1h)":
                    hour_data["hour"] = field["value"]
                elif field["field"] == "count":
                    hour_data["count"] = int(field["value"])

            if hour_data:
                summary["hourlyDistribution"].append(hour_data)

        return json.dumps(summary, indent=2)

    @handle_exceptions
    async def find_error_patterns(
        self,
        log_group_name: str,
        hours: int = 24,
        start_time: str = None,
        end_time: str = None,
    ) -> str:
        """
        Find common error patterns in logs.

        Args:
            log_group_name: The log group to analyze
            hours: Number of hours to look back
            start_time: Start time in ISO8601 format
            end_time: End time in ISO8601 format

        Returns:
            JSON string with error patterns
        """
        start_ts, end_ts = get_time_range(hours, start_time, end_time)

        # Query for error logs
        error_query = """
        filter @message like /(?i)(error|exception|fail|traceback)/
        | stats count(*) as errorCount by @message
        | sort errorCount desc
        | limit 20
        """

        # Start the query
        start_query_response = self.logs_client.start_query(
            logGroupName=log_group_name,
            startTime=start_ts,
            endTime=end_ts,
            queryString=error_query,
        )

        query_id = start_query_response["queryId"]

        # Poll for query results
        response = None
        while response is None or response["status"] == "Running":
            await asyncio.sleep(1)  # Wait before checking again
            response = self.logs_client.get_query_results(queryId=query_id)

        # Process the results
        error_patterns = {
            "timeRange": {
                "start": datetime.fromtimestamp(start_ts / 1000).isoformat(),
                "end": datetime.fromtimestamp(end_ts / 1000).isoformat(),
                "hours": hours,
            },
            "errorPatterns": [],
        }

        for result in response.get("results", []):
            pattern = {}
            for field in result:
                if field["field"] == "@message":
                    pattern["message"] = field["value"]
                elif field["field"] == "errorCount":
                    pattern["count"] = int(field["value"])

            if pattern:
                error_patterns["errorPatterns"].append(pattern)

        return json.dumps(error_patterns, indent=2)

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import sys
import os
import argparse
from typing import List, Callable, Any, Type, Optional
from functools import wraps
import asyncio

from mcp.server.fastmcp import FastMCP
from resources.cloudwatch_logs_resource import CloudWatchLogsResource
from tools.search_tools import CloudWatchLogsSearchTools
from tools.analysis_tools import CloudWatchLogsAnalysisTools
from tools.correlation_tools import CloudWatchLogsCorrelationTools

# Parse command line arguments
parser = argparse.ArgumentParser(description="CloudWatch Logs Analyzer MCP Server")
parser.add_argument(
    "--profile", type=str, help="AWS profile name to use for credentials"
)
parser.add_argument("--region", type=str, help="AWS region name to use for API calls")
parser.add_argument(
    "--stateless", action="store_true", help="Stateless HTTP mode", default=False
)
args, unknown = parser.parse_known_args()

# Add the current directory to the path so we can import our modules
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)

# Create the MCP server for CloudWatch logs
mcp = FastMCP("CloudWatch Logs Analyzer", stateless_http=args.stateless)

# Initialize our resource and tools classes with the specified AWS profile and region
cw_resource = CloudWatchLogsResource(profile_name=args.profile, region_name=args.region)
search_tools = CloudWatchLogsSearchTools(
    profile_name=args.profile, region_name=args.region
)
analysis_tools = CloudWatchLogsAnalysisTools(
    profile_name=args.profile, region_name=args.region
)
correlation_tools = CloudWatchLogsCorrelationTools(
    profile_name=args.profile, region_name=args.region
)

# Capture the parsed CLI profile and region in separate variables
default_profile = args.profile
default_region = args.region


# Helper decorator to handle profile and region parameters for tools
def with_aws_config(tool_class: Type, method_name: Optional[str] = None) -> Callable:
    """
    Decorator that handles the profile and region parameters for tool functions.
    Creates a new instance of the specified tool class with the correct profile and region.

    Args:
        tool_class: The class to instantiate with the profile and region
        method_name: Optional method name if different from the decorated function
    """

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Any:
            try:
                profile = kwargs.pop("profile", None) or default_profile
                region = kwargs.pop("region", None) or default_region
                tool_instance = tool_class(profile_name=profile, region_name=region)
                target_method = method_name or func.__name__
                method = getattr(tool_instance, target_method)
                result = method(**kwargs)
                if asyncio.iscoroutine(result):
                    return await result
                return result
            except AttributeError as e:
                raise RuntimeError(
                    f"Method {target_method} not found in {tool_class.__name__}, {e}"
                ) from e
            except Exception as e:
                raise RuntimeError(
                    f"An error {e} occurred while executing {target_method} in {tool_class.__name__}"
                ) from e

        return wrapper

    return decorator


# ==============================
# Resource Handlers
# ==============================


@mcp.resource("logs://groups")
def get_log_groups() -> str:
    """Get a list of all CloudWatch Log Groups"""
    # Use default values for parameters
    prefix = None
    limit = 50
    next_token = None

    return cw_resource.get_log_groups(prefix, limit, next_token)


@mcp.resource("logs://groups/filter/{prefix}")
def get_filtered_log_groups(prefix: str) -> str:
    """
    Get a filtered list of CloudWatch Log Groups by prefix

    Args:
        prefix: The prefix to filter log groups by
    """
    # Default values for other parameters
    limit = 50
    next_token = None

    return cw_resource.get_log_groups(prefix, limit, next_token)


@mcp.resource("logs://groups/{log_group_name}")
def get_log_group_details(log_group_name: str) -> str:
    """Get detailed information about a specific log group"""
    return cw_resource.get_log_group_details(log_group_name)


@mcp.resource("logs://groups/{log_group_name}/streams")
def get_log_streams(log_group_name: str) -> str:
    """
    Get a list of log streams for a specific log group

    Args:
        log_group_name: The name of the log group
    """
    # Use default limit value
    limit = 20
    return cw_resource.get_log_streams(log_group_name, limit)


@mcp.resource("logs://groups/{log_group_name}/streams/{log_stream_name}")
def get_log_events(log_group_name: str, log_stream_name: str) -> str:
    """
    Get log events from a specific log stream

    Args:
        log_group_name: The name of the log group
        log_stream_name: The name of the log stream
    """
    # Use default limit value
    limit = 100
    return cw_resource.get_log_events(log_group_name, log_stream_name, limit)


@mcp.resource("logs://groups/{log_group_name}/sample")
def get_log_sample(log_group_name: str) -> str:
    """
    Get a sample of recent logs from a log group

    Args:
        log_group_name: The name of the log group
    """
    # Use default limit value
    limit = 10
    return cw_resource.get_log_sample(log_group_name, limit)


@mcp.resource("logs://groups/{log_group_name}/recent-errors")
def get_recent_errors(log_group_name: str) -> str:
    """
    Get recent error logs from a log group

    Args:
        log_group_name: The name of the log group
    """
    # Use default hours value
    hours = 24
    return cw_resource.get_recent_errors(log_group_name, hours)


@mcp.resource("logs://groups/{log_group_name}/metrics")
def get_log_metrics(log_group_name: str) -> str:
    """
    Get log volume metrics for a log group

    Args:
        log_group_name: The name of the log group
    """
    # Use default hours value
    hours = 24
    return cw_resource.get_log_metrics(log_group_name, hours)


@mcp.resource("logs://groups/{log_group_name}/structure")
def analyze_log_structure(log_group_name: str) -> str:
    """Analyze and provide information about the structure of logs"""
    return cw_resource.analyze_log_structure(log_group_name)


# ==============================
# Prompts
# ==============================


@mcp.prompt()
def list_cloudwatch_log_groups(
    prefix: str = None, profile: str = None, region: str = None
) -> str:
    """
    Prompt for listing and exploring CloudWatch log groups.

    Args:
        prefix: Optional prefix to filter log groups by name
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls
    """
    profile_text = f" using profile '{profile}'" if profile else ""
    region_text = f" in region '{region}'" if region else ""
    prefix_text = f" with prefix '{prefix}'" if prefix else ""

    return f"""I'll help you explore the CloudWatch log groups in your AWS environment{profile_text}{region_text}.

First, I'll list the available log groups{prefix_text}.

For each log group, I can help you:
1. Get detailed information about the group (retention, size, etc.)
2. Check for recent errors or patterns
3. View metrics like volume and activity
4. Sample recent logs to understand the content
5. Search for specific patterns or events

Let me know which log group you'd like to explore further, or if you'd like to refine the search with a different prefix.
"""


@mcp.prompt()
def analyze_cloudwatch_logs(
    log_group_name: str, profile: str = None, region: str = None
) -> str:
    """
    Prompt for analyzing CloudWatch logs to help identify issues, patterns, and insights.

    Args:
        log_group_name: The name of the log group to analyze
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls
    """
    profile_text = f" using profile '{profile}'" if profile else ""
    region_text = f" in region '{region}'" if region else ""

    return f"""Please analyze the following CloudWatch logs from the {log_group_name} log group{profile_text}{region_text}.

First, I'll get you some information about the log group:
1. Get the basic log group structure to understand the format of logs
2. Check for any recent errors
3. Examine the log volume metrics
4. Analyze a sample of recent logs

Based on this information, please:
- Identify any recurring errors or exceptions
- Look for unusual patterns or anomalies
- Suggest possible root causes for any issues found
- Recommend actions to resolve or mitigate problems
- Provide insights on performance or resource utilization

Feel free to ask for additional context if needed, such as:
- Correlation with logs from other services
- More specific time ranges for analysis
- Queries for specific error messages or events
"""


# ==============================
# Tool Handlers
# ==============================


@mcp.tool()
@with_aws_config(CloudWatchLogsResource, method_name="get_log_groups")
async def list_log_groups(
    prefix: str = None,
    limit: int = 50,
    next_token: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    List available CloudWatch log groups with optional filtering by prefix.

    Args:
        prefix: Optional prefix to filter log groups by name
        limit: Maximum number of log groups to return (default: 50)
        next_token: Token for pagination to get the next set of results
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with log groups information
    """
    # Function body is handled by the decorator
    pass


@mcp.tool()
@with_aws_config(CloudWatchLogsSearchTools)
async def search_logs(
    log_group_name: str,
    query: str,
    hours: int = 24,
    start_time: str = None,
    end_time: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    Search logs using CloudWatch Logs Insights query.

    Args:
        log_group_name: The log group to search
        query: CloudWatch Logs Insights query syntax
        hours: Number of hours to look back
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with search results
    """
    # Function body is handled by the decorator
    pass


@mcp.tool()
@with_aws_config(CloudWatchLogsSearchTools)
async def search_logs_multi(
    log_group_names: List[str],
    query: str,
    hours: int = 24,
    start_time: str = None,
    end_time: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    Search logs across multiple log groups using CloudWatch Logs Insights.

    Args:
        log_group_names: List of log groups to search
        query: CloudWatch Logs Insights query in Logs Insights syntax
        hours: Number of hours to look back (default: 24)
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with search results
    """
    # Function body is handled by the decorator
    pass


@mcp.tool()
@with_aws_config(CloudWatchLogsSearchTools)
async def filter_log_events(
    log_group_name: str,
    filter_pattern: str,
    hours: int = 24,
    start_time: str = None,
    end_time: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    Filter log events by pattern across all streams in a log group.

    Args:
        log_group_name: The log group to filter
        filter_pattern: The pattern to search for (CloudWatch Logs filter syntax)
        hours: Number of hours to look back
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with filtered events
    """
    # Function body is handled by the decorator
    pass


@mcp.tool()
@with_aws_config(CloudWatchLogsAnalysisTools)
async def summarize_log_activity(
    log_group_name: str,
    hours: int = 24,
    start_time: str = None,
    end_time: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    Generate a summary of log activity over a specified time period.

    Args:
        log_group_name: The log group to analyze
        hours: Number of hours to look back
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with activity summary
    """
    # Function body is handled by the decorator
    pass


@mcp.tool()
@with_aws_config(CloudWatchLogsAnalysisTools)
async def find_error_patterns(
    log_group_name: str,
    hours: int = 24,
    start_time: str = None,
    end_time: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    Find common error patterns in logs.

    Args:
        log_group_name: The log group to analyze
        hours: Number of hours to look back
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with error patterns
    """
    # Function body is handled by the decorator
    pass


@mcp.tool()
@with_aws_config(CloudWatchLogsCorrelationTools)
async def correlate_logs(
    log_group_names: List[str],
    search_term: str,
    hours: int = 24,
    start_time: str = None,
    end_time: str = None,
    profile: str = None,
    region: str = None,
) -> str:
    """
    Correlate logs across multiple AWS services using a common search term.

    Args:
        log_group_names: List of log group names to search
        search_term: Term to search for in logs (request ID, transaction ID, etc.)
        hours: Number of hours to look back
        start_time: Optional ISO8601 start time
        end_time: Optional ISO8601 end time
        profile: Optional AWS profile name to use for credentials
        region: Optional AWS region name to use for API calls

    Returns:
        JSON string with correlated events
    """
    # Function body is handled by the decorator
    pass


if __name__ == "__main__":
    # Run the MCP server
    mcp.run()

```

--------------------------------------------------------------------------------
/src/cw-mcp-server/resources/cloudwatch_logs_resource.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List
import re
from collections import Counter


class CloudWatchLogsResource:
    """Resource class for handling CloudWatch Logs resources."""

    def __init__(self, profile_name=None, region_name=None):
        """Initialize the CloudWatch Logs resource client.

        Args:
            profile_name: Optional AWS profile name to use for credentials
            region_name: Optional AWS region name to use for API calls
        """
        # Store the profile name and region for later use
        self.profile_name = profile_name
        self.region_name = region_name

        # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain
        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        self.logs_client = session.client("logs")

    def get_log_groups(
        self, prefix: str = None, limit: int = 50, next_token: str = None
    ) -> str:
        """
        Get a list of CloudWatch Log Groups with optional filtering and pagination.

        Args:
            prefix: Optional prefix to filter log groups by name
            limit: Maximum number of log groups to return (default: 50)
            next_token: Token for pagination to get the next set of results

        Returns:
            JSON string with log groups information
        """
        kwargs = {"limit": limit}
        if prefix:
            kwargs["logGroupNamePrefix"] = prefix
        if next_token:
            kwargs["nextToken"] = next_token

        response = self.logs_client.describe_log_groups(**kwargs)
        log_groups = response.get("logGroups", [])

        # Format the log groups information
        formatted_groups = []
        for group in log_groups:
            formatted_groups.append(
                {
                    "name": group.get("logGroupName"),
                    "arn": group.get("arn"),
                    "storedBytes": group.get("storedBytes"),
                    "creationTime": datetime.fromtimestamp(
                        group.get("creationTime", 0) / 1000
                    ).isoformat(),
                }
            )

        # Include the nextToken if available
        result = {"logGroups": formatted_groups}

        if "nextToken" in response:
            result["nextToken"] = response["nextToken"]

        return json.dumps(result, indent=2)

    def get_log_group_details(self, log_group_name: str) -> str:
        """Get detailed information about a specific log group."""
        try:
            response = self.logs_client.describe_log_groups(
                logGroupNamePrefix=log_group_name, limit=1
            )
            log_groups = response.get("logGroups", [])

            if not log_groups:
                return json.dumps(
                    {"error": f"Log group '{log_group_name}' not found"}, indent=2
                )

            log_group = log_groups[0]

            # Get retention policy
            retention = "Never Expire"
            if "retentionInDays" in log_group:
                retention = f"{log_group['retentionInDays']} days"

            # Get metrics for the log group
            session = boto3.Session(
                profile_name=self.profile_name, region_name=self.region_name
            )
            cloudwatch = session.client("cloudwatch")
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(days=1)

            metrics_response = cloudwatch.get_metric_statistics(
                Namespace="AWS/Logs",
                MetricName="IncomingBytes",
                Dimensions=[
                    {"Name": "LogGroupName", "Value": log_group_name},
                ],
                StartTime=start_time,
                EndTime=end_time,
                Period=3600,
                Statistics=["Sum"],
            )

            # Format the detailed information
            details = {
                "name": log_group.get("logGroupName"),
                "arn": log_group.get("arn"),
                "storedBytes": log_group.get("storedBytes"),
                "creationTime": datetime.fromtimestamp(
                    log_group.get("creationTime", 0) / 1000
                ).isoformat(),
                "retentionPolicy": retention,
                "metricFilterCount": log_group.get("metricFilterCount", 0),
                "kmsKeyId": log_group.get("kmsKeyId", "Not encrypted with KMS"),
                "dailyIncomingBytes": [
                    {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]}
                    for point in metrics_response.get("Datapoints", [])
                ],
            }

            return json.dumps(details, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def get_log_streams(self, log_group_name: str, limit: int = 20) -> str:
        """
        Get a list of log streams for a specific log group.

        Args:
            log_group_name: The name of the log group
            limit: Maximum number of streams to return (default: 20)
        """
        try:
            response = self.logs_client.describe_log_streams(
                logGroupName=log_group_name,
                orderBy="LastEventTime",
                descending=True,
                limit=limit,
            )

            log_streams = response.get("logStreams", [])
            formatted_streams = []

            for stream in log_streams:
                last_event_time = stream.get("lastEventTimestamp", 0)
                first_event_time = stream.get("firstEventTimestamp", 0)

                formatted_streams.append(
                    {
                        "name": stream.get("logStreamName"),
                        "firstEventTime": datetime.fromtimestamp(
                            first_event_time / 1000
                        ).isoformat()
                        if first_event_time
                        else None,
                        "lastEventTime": datetime.fromtimestamp(
                            last_event_time / 1000
                        ).isoformat()
                        if last_event_time
                        else None,
                        "storedBytes": stream.get("storedBytes"),
                    }
                )

            return json.dumps(formatted_streams, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def get_log_events(
        self, log_group_name: str, log_stream_name: str, limit: int = 100
    ) -> str:
        """
        Get log events from a specific log stream.

        Args:
            log_group_name: The name of the log group
            log_stream_name: The name of the log stream
            limit: Maximum number of events to return (default: 100)
        """
        try:
            response = self.logs_client.get_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                limit=limit,
                startFromHead=False,
            )

            events = response.get("events", [])
            formatted_events = []

            for event in events:
                formatted_events.append(
                    {
                        "timestamp": datetime.fromtimestamp(
                            event.get("timestamp", 0) / 1000
                        ).isoformat(),
                        "message": event.get("message"),
                        "ingestionTime": datetime.fromtimestamp(
                            event.get("ingestionTime", 0) / 1000
                        ).isoformat(),
                    }
                )

            return json.dumps(formatted_events, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def get_log_sample(self, log_group_name: str, limit: int = 10) -> str:
        """Get a sample of recent logs from a log group."""
        try:
            # First get the most recent stream
            stream_response = self.logs_client.describe_log_streams(
                logGroupName=log_group_name,
                orderBy="LastEventTime",
                descending=True,
                limit=1,
            )

            log_streams = stream_response.get("logStreams", [])
            if not log_streams:
                return json.dumps(
                    {"error": f"No streams found in log group '{log_group_name}'"},
                    indent=2,
                )

            # Get events from the most recent stream
            log_stream_name = log_streams[0].get("logStreamName")
            response = self.logs_client.get_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                limit=limit,
                startFromHead=False,
            )

            events = response.get("events", [])
            formatted_events = []

            for event in events:
                formatted_events.append(
                    {
                        "timestamp": datetime.fromtimestamp(
                            event.get("timestamp", 0) / 1000
                        ).isoformat(),
                        "message": event.get("message"),
                        "streamName": log_stream_name,
                    }
                )

            return json.dumps(
                {
                    "description": f"Sample of {len(formatted_events)} recent logs from '{log_group_name}'",
                    "logStream": log_stream_name,
                    "events": formatted_events,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def get_recent_errors(self, log_group_name: str, hours: int = 24) -> str:
        """Get recent error logs from a log group."""
        try:
            # Calculate start time
            end_time = int(datetime.now().timestamp() * 1000)
            start_time = int(
                (datetime.now() - timedelta(hours=hours)).timestamp() * 1000
            )

            # Use filter_log_events to search for errors across all streams
            # Common error patterns to search for
            error_patterns = [
                "ERROR",
                "Error",
                "error",
                "exception",
                "Exception",
                "EXCEPTION",
                "fail",
                "Fail",
                "FAIL",
            ]

            filter_pattern = " ".join([f'"{pattern}"' for pattern in error_patterns])
            response = self.logs_client.filter_log_events(
                logGroupName=log_group_name,
                filterPattern=f"{filter_pattern}",
                startTime=start_time,
                endTime=end_time,
                limit=100,
            )

            events = response.get("events", [])
            formatted_events = []

            for event in events:
                formatted_events.append(
                    {
                        "timestamp": datetime.fromtimestamp(
                            event.get("timestamp", 0) / 1000
                        ).isoformat(),
                        "message": event.get("message"),
                        "logStreamName": event.get("logStreamName"),
                    }
                )

            return json.dumps(
                {
                    "description": f"Recent errors from '{log_group_name}' in the last {hours} hours",
                    "totalErrors": len(formatted_events),
                    "events": formatted_events,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def get_log_metrics(self, log_group_name: str, hours: int = 24) -> str:
        """Get log volume metrics for a log group."""
        try:
            # Create CloudWatch client
            session = boto3.Session(
                profile_name=self.profile_name, region_name=self.region_name
            )
            cloudwatch = session.client("cloudwatch")

            # Calculate start and end times
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(hours=hours)

            # Get incoming bytes
            incoming_bytes = cloudwatch.get_metric_statistics(
                Namespace="AWS/Logs",
                MetricName="IncomingBytes",
                Dimensions=[
                    {"Name": "LogGroupName", "Value": log_group_name},
                ],
                StartTime=start_time,
                EndTime=end_time,
                Period=3600,  # 1 hour periods
                Statistics=["Sum"],
            )

            # Get incoming log events
            incoming_events = cloudwatch.get_metric_statistics(
                Namespace="AWS/Logs",
                MetricName="IncomingLogEvents",
                Dimensions=[
                    {"Name": "LogGroupName", "Value": log_group_name},
                ],
                StartTime=start_time,
                EndTime=end_time,
                Period=3600,  # 1 hour periods
                Statistics=["Sum"],
            )

            # Format metrics data
            bytes_datapoints = incoming_bytes.get("Datapoints", [])
            events_datapoints = incoming_events.get("Datapoints", [])

            bytes_datapoints.sort(key=lambda x: x["Timestamp"])
            events_datapoints.sort(key=lambda x: x["Timestamp"])

            bytes_data = [
                {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]}
                for point in bytes_datapoints
            ]

            events_data = [
                {"timestamp": point["Timestamp"].isoformat(), "events": point["Sum"]}
                for point in events_datapoints
            ]

            # Calculate totals
            total_bytes = sum(point["Sum"] for point in bytes_datapoints)
            total_events = sum(point["Sum"] for point in events_datapoints)

            return json.dumps(
                {
                    "description": f"Log metrics for '{log_group_name}' over the last {hours} hours",
                    "totalBytes": total_bytes,
                    "totalEvents": total_events,
                    "bytesByHour": bytes_data,
                    "eventsByHour": events_data,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def analyze_log_structure(self, log_group_name: str) -> str:
        """Analyze and provide information about the structure of logs."""
        try:
            # Get a sample of logs to analyze
            sample_data = json.loads(self.get_log_sample(log_group_name, 50))

            if "error" in sample_data:
                return json.dumps(sample_data, indent=2)

            events = sample_data.get("events", [])

            if not events:
                return json.dumps(
                    {"error": "No log events found for analysis"}, indent=2
                )

            # Analyze the structure
            structure_info = {
                "description": f"Log structure analysis for '{log_group_name}'",
                "sampleSize": len(events),
                "format": self._detect_log_format(events),
                "commonPatterns": self._extract_common_patterns(events),
                "fieldAnalysis": self._analyze_fields(events),
            }

            return json.dumps(structure_info, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    def _detect_log_format(self, events: List[Dict]) -> str:
        """Detect the format of logs (JSON, plaintext, etc.)."""
        json_count = 0
        key_value_count = 0
        xml_count = 0

        for event in events:
            message = event.get("message", "")

            # Check for JSON format
            if message.strip().startswith("{") and message.strip().endswith("}"):
                try:
                    json.loads(message)
                    json_count += 1
                    continue
                except json.JSONDecodeError:
                    pass

            # Check for XML format
            if message.strip().startswith("<") and message.strip().endswith(">"):
                xml_count += 1
                continue

            # Check for key-value pairs
            if re.search(r"\w+=[\'\"][^\'\"]*[\'\"]|\w+=\S+", message):
                key_value_count += 1

        total = len(events)

        if json_count > total * 0.7:
            return "JSON"
        elif xml_count > total * 0.7:
            return "XML"
        elif key_value_count > total * 0.7:
            return "Key-Value Pairs"
        else:
            return "Plaintext/Unstructured"

    def _extract_common_patterns(self, events: List[Dict]) -> Dict:
        """Extract common patterns from log messages."""
        # Look for common log patterns
        level_pattern = re.compile(
            r"\b(DEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)\b"
        )
        timestamp_patterns = [
            re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"),  # ISO format
            re.compile(
                r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
            ),  # Common datetime format
            re.compile(r"\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}"),  # MM/DD/YYYY format
        ]

        # Count occurrences
        levels = Counter()
        has_timestamp = 0

        for event in events:
            message = event.get("message", "")

            # Check log levels
            level_match = level_pattern.search(message)
            if level_match:
                levels[level_match.group(0)] += 1

            # Check timestamps in message content (not event timestamp)
            for pattern in timestamp_patterns:
                if pattern.search(message):
                    has_timestamp += 1
                    break

        return {
            "logLevels": dict(levels),
            "containsTimestamp": has_timestamp,
            "timestampPercentage": round((has_timestamp / len(events)) * 100, 2)
            if events
            else 0,
        }

    def _analyze_fields(self, events: List[Dict]) -> Dict:
        """Analyze fields in structured log messages."""
        format_type = self._detect_log_format(events)

        if format_type == "JSON":
            # Try to extract fields from JSON logs
            fields_count = Counter()

            for event in events:
                message = event.get("message", "")
                try:
                    json_data = json.loads(message)
                    for key in json_data.keys():
                        fields_count[key] += 1
                except json.JSONDecodeError:
                    continue

            # Get the most common fields
            common_fields = [
                {
                    "field": field,
                    "occurrences": count,
                    "percentage": round((count / len(events)) * 100, 2),
                }
                for field, count in fields_count.most_common(10)
            ]

            return {"commonFields": common_fields, "uniqueFields": len(fields_count)}

        elif format_type == "Key-Value Pairs":
            # Try to extract key-value pairs
            key_pattern = re.compile(r"(\w+)=[\'\"]?([^\'\"\s]*)[\'\"]?")
            fields_count = Counter()

            for event in events:
                message = event.get("message", "")
                matches = key_pattern.findall(message)
                for key, _ in matches:
                    fields_count[key] += 1

            # Get the most common fields
            common_fields = [
                {
                    "field": field,
                    "occurrences": count,
                    "percentage": round((count / len(events)) * 100, 2),
                }
                for field, count in fields_count.most_common(10)
            ]

            return {"commonFields": common_fields, "uniqueFields": len(fields_count)}

        else:
            return {
                "analysis": f"Field analysis not applicable for {format_type} format"
            }

```

--------------------------------------------------------------------------------
/src/client.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import asyncio
import argparse
import json
import sys
import os

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# Set up argument parser for the CLI
parser = argparse.ArgumentParser(description="CloudWatch Logs MCP Client")
parser.add_argument(
    "--profile", type=str, help="AWS profile name to use for credentials"
)
parser.add_argument("--region", type=str, help="AWS region name to use for API calls")
subparsers = parser.add_subparsers(dest="command", help="Command to execute")

# List log groups command
list_groups_parser = subparsers.add_parser(
    "list-groups", help="List CloudWatch log groups"
)
list_groups_parser.add_argument("--prefix", help="Filter log groups by name prefix")
list_groups_parser.add_argument(
    "--limit",
    type=int,
    default=50,
    help="Maximum number of log groups to return (default: 50)",
)
list_groups_parser.add_argument(
    "--next-token", help="Token for pagination to get the next set of results"
)
list_groups_parser.add_argument(
    "--use-tool", action="store_true", help="Use the tool interface instead of resource"
)
list_groups_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
list_groups_parser.add_argument("--region", help="AWS region name to use for API calls")

# Get log group details command
group_details_parser = subparsers.add_parser(
    "group-details", help="Get detailed information about a log group"
)
group_details_parser.add_argument("log_group_name", help="The name of the log group")
group_details_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
group_details_parser.add_argument(
    "--region", help="AWS region name to use for API calls"
)

# List log streams command
list_streams_parser = subparsers.add_parser(
    "list-streams", help="List log streams for a specific log group"
)
list_streams_parser.add_argument("log_group_name", help="The name of the log group")
list_streams_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
list_streams_parser.add_argument(
    "--region", help="AWS region name to use for API calls"
)

# Get log events command
get_events_parser = subparsers.add_parser(
    "get-events", help="Get log events from a specific log stream"
)
get_events_parser.add_argument("log_group_name", help="The name of the log group")
get_events_parser.add_argument("log_stream_name", help="The name of the log stream")
get_events_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
get_events_parser.add_argument("--region", help="AWS region name to use for API calls")

# Get log sample command
sample_parser = subparsers.add_parser(
    "sample", help="Get a sample of recent logs from a log group"
)
sample_parser.add_argument("log_group_name", help="The name of the log group")
sample_parser.add_argument(
    "--limit", type=int, default=10, help="Number of logs to sample (default: 10)"
)
sample_parser.add_argument("--profile", help="AWS profile name to use for credentials")
sample_parser.add_argument("--region", help="AWS region name to use for API calls")

# Get recent errors command
errors_parser = subparsers.add_parser(
    "recent-errors", help="Get recent error logs from a log group"
)
errors_parser.add_argument(
    "log_group_name", help="The name of the log group to analyze"
)
errors_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
errors_parser.add_argument("--profile", help="AWS profile name to use for credentials")
errors_parser.add_argument("--region", help="AWS region name to use for API calls")

# Get log metrics command
metrics_parser = subparsers.add_parser(
    "metrics", help="Get log volume metrics for a log group"
)
metrics_parser.add_argument(
    "log_group_name", help="The name of the log group to analyze"
)
metrics_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
metrics_parser.add_argument("--profile", help="AWS profile name to use for credentials")
metrics_parser.add_argument("--region", help="AWS region name to use for API calls")

# Analyze log structure command
structure_parser = subparsers.add_parser(
    "structure", help="Analyze the structure of logs in a log group"
)
structure_parser.add_argument(
    "log_group_name", help="The name of the log group to analyze"
)
structure_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
structure_parser.add_argument("--region", help="AWS region name to use for API calls")

# Get analyze logs prompt command
prompt_parser = subparsers.add_parser(
    "get-prompt", help="Get a prompt for analyzing CloudWatch logs"
)
prompt_parser.add_argument(
    "log_group_name", help="The name of the log group to analyze"
)
prompt_parser.add_argument("--profile", help="AWS profile name to use for credentials")
prompt_parser.add_argument("--region", help="AWS region name to use for API calls")

# Get list groups prompt command
list_prompt_parser = subparsers.add_parser(
    "list-prompt", help="Get a prompt for listing CloudWatch log groups"
)
list_prompt_parser.add_argument(
    "--prefix", help="Optional prefix to filter log groups by name"
)
list_prompt_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
list_prompt_parser.add_argument("--region", help="AWS region name to use for API calls")

# Search logs command
search_parser = subparsers.add_parser(
    "search", help="Search for patterns in CloudWatch logs"
)
search_parser.add_argument("log_group_name", help="The name of the log group to search")
search_parser.add_argument(
    "query", help="The search query (CloudWatch Logs Insights syntax)"
)
search_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
search_parser.add_argument(
    "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
)
search_parser.add_argument(
    "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
)
search_parser.add_argument("--profile", help="AWS profile name to use for credentials")
search_parser.add_argument("--region", help="AWS region name to use for API calls")

# Search multiple log groups command
search_multi_parser = subparsers.add_parser(
    "search-multi", help="Search for patterns across multiple CloudWatch log groups"
)
search_multi_parser.add_argument(
    "log_group_names", nargs="+", help="List of log group names to search"
)
search_multi_parser.add_argument(
    "query", help="The search query (CloudWatch Logs Insights syntax)"
)
search_multi_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
search_multi_parser.add_argument(
    "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
)
search_multi_parser.add_argument(
    "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
)
search_multi_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
search_multi_parser.add_argument(
    "--region", help="AWS region name to use for API calls"
)

# Summarize log activity command
summarize_parser = subparsers.add_parser(
    "summarize", help="Generate a summary of log activity"
)
summarize_parser.add_argument(
    "log_group_name", help="The name of the log group to analyze"
)
summarize_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
summarize_parser.add_argument(
    "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
)
summarize_parser.add_argument(
    "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
)
summarize_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
summarize_parser.add_argument("--region", help="AWS region name to use for API calls")

# Find error patterns command
errors_parser = subparsers.add_parser(
    "find-errors", help="Find common error patterns in logs"
)
errors_parser.add_argument(
    "log_group_name", help="The name of the log group to analyze"
)
errors_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
errors_parser.add_argument(
    "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
)
errors_parser.add_argument(
    "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
)
errors_parser.add_argument("--profile", help="AWS profile name to use for credentials")
errors_parser.add_argument("--region", help="AWS region name to use for API calls")

# Correlate logs command
correlate_parser = subparsers.add_parser(
    "correlate", help="Correlate logs across multiple AWS services"
)
correlate_parser.add_argument(
    "log_group_names", nargs="+", help="List of log group names to search"
)
correlate_parser.add_argument("search_term", help="Term to search for in logs")
correlate_parser.add_argument(
    "--hours", type=int, default=24, help="Number of hours to look back (default: 24)"
)
correlate_parser.add_argument(
    "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)"
)
correlate_parser.add_argument(
    "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)"
)
correlate_parser.add_argument(
    "--profile", help="AWS profile name to use for credentials"
)
correlate_parser.add_argument("--region", help="AWS region name to use for API calls")


def add_aws_config_args(tool_args, args):
    """Add profile and region arguments to tool calls if specified."""
    if args.profile:
        tool_args["profile"] = args.profile
    if args.region:
        tool_args["region"] = args.region
    return tool_args


async def main():
    """Main function to run the CloudWatch Logs MCP client."""
    args = parser.parse_args()

    # Determine the server path (relative or absolute)
    script_dir = os.path.dirname(os.path.abspath(__file__))
    server_path = os.path.join(script_dir, "cw-mcp-server", "server.py")

    # Prepare server arguments
    server_args = [server_path]
    if args.profile:
        server_args.extend(["--profile", args.profile])
    if args.region:
        server_args.extend(["--region", args.region])

    # Create server parameters
    server_params = StdioServerParameters(command="python3", args=server_args, env=None)

    # Connect to the server
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize the client session
            await session.initialize()

            # Check if a command was specified
            if args.command is None:
                parser.print_help()
                return

            try:
                # Execute the requested command
                if args.command == "list-groups":
                    if args.use_tool:
                        # Use the tool interface
                        tool_args = {}
                        if args.prefix:
                            tool_args["prefix"] = args.prefix
                        if args.limit:
                            tool_args["limit"] = args.limit
                        if args.next_token:
                            tool_args["next_token"] = args.next_token
                        tool_args = add_aws_config_args(tool_args, args)

                        result = await session.call_tool(
                            "list_log_groups", arguments=tool_args
                        )
                        print_json_response(result)
                    else:
                        # Use the resource interface
                        # Build query string for parameters if provided
                        if args.prefix:
                            # If prefix is provided, use the filtered endpoint
                            resource_uri = f"logs://groups/filter/{args.prefix}"
                        else:
                            resource_uri = "logs://groups"

                        content, _ = await session.read_resource(resource_uri)
                        print_json_response(content)

                elif args.command == "group-details":
                    resource_uri = f"logs://groups/{args.log_group_name}"
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "list-streams":
                    resource_uri = f"logs://groups/{args.log_group_name}/streams"
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "get-events":
                    resource_uri = f"logs://groups/{args.log_group_name}/streams/{args.log_stream_name}"
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "sample":
                    resource_uri = (
                        f"logs://groups/{args.log_group_name}/sample?limit={args.limit}"
                    )
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "recent-errors":
                    resource_uri = f"logs://groups/{args.log_group_name}/recent-errors?hours={args.hours}"
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "metrics":
                    resource_uri = f"logs://groups/{args.log_group_name}/metrics?hours={args.hours}"
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "structure":
                    resource_uri = f"logs://groups/{args.log_group_name}/structure"
                    content, _ = await session.read_resource(resource_uri)
                    print_json_response(content)

                elif args.command == "get-prompt":
                    # Get the analyze logs prompt from the server
                    arguments = {"log_group_name": args.log_group_name}
                    arguments = add_aws_config_args(arguments, args)
                    result = await session.get_prompt(
                        "analyze_cloudwatch_logs",
                        arguments=arguments,
                    )

                    # Extract and print the prompt text
                    prompt_messages = result.messages
                    if prompt_messages and len(prompt_messages) > 0:
                        message = prompt_messages[0]
                        if hasattr(message, "content") and hasattr(
                            message.content, "text"
                        ):
                            print(message.content.text)
                        else:
                            print(
                                json.dumps(
                                    message, default=lambda x: x.__dict__, indent=2
                                )
                            )
                    else:
                        print("No prompt received.")

                elif args.command == "list-prompt":
                    # Get arguments for the prompt
                    arguments = {}
                    if args.prefix:
                        arguments["prefix"] = args.prefix
                    arguments = add_aws_config_args(arguments, args)

                    # Get the list logs prompt from the server
                    result = await session.get_prompt(
                        "list_cloudwatch_log_groups", arguments=arguments
                    )

                    # Extract and print the prompt text
                    prompt_messages = result.messages
                    if prompt_messages and len(prompt_messages) > 0:
                        message = prompt_messages[0]
                        if hasattr(message, "content") and hasattr(
                            message.content, "text"
                        ):
                            print(message.content.text)
                        else:
                            print(
                                json.dumps(
                                    message, default=lambda x: x.__dict__, indent=2
                                )
                            )
                    else:
                        print("No prompt received.")

                elif args.command == "search":
                    tool_args = {
                        "log_group_name": args.log_group_name,
                        "query": args.query,
                    }
                    if args.start_time:
                        tool_args["start_time"] = args.start_time
                    if args.end_time:
                        tool_args["end_time"] = args.end_time
                    if not (args.start_time or args.end_time):
                        tool_args["hours"] = args.hours
                    tool_args = add_aws_config_args(tool_args, args)
                    result = await session.call_tool(
                        "search_logs",
                        arguments=tool_args,
                    )
                    print_json_response(result)

                elif args.command == "search-multi":
                    tool_args = {
                        "log_group_names": args.log_group_names,
                        "query": args.query,
                    }
                    if args.start_time:
                        tool_args["start_time"] = args.start_time
                    if args.end_time:
                        tool_args["end_time"] = args.end_time
                    if not (args.start_time or args.end_time):
                        tool_args["hours"] = args.hours
                    tool_args = add_aws_config_args(tool_args, args)
                    result = await session.call_tool(
                        "search_logs_multi",
                        arguments=tool_args,
                    )
                    print_json_response(result)

                elif args.command == "summarize":
                    tool_args = {
                        "log_group_name": args.log_group_name,
                    }
                    if args.start_time:
                        tool_args["start_time"] = args.start_time
                    if args.end_time:
                        tool_args["end_time"] = args.end_time
                    if not (args.start_time or args.end_time):
                        tool_args["hours"] = args.hours
                    tool_args = add_aws_config_args(tool_args, args)
                    result = await session.call_tool(
                        "summarize_log_activity",
                        arguments=tool_args,
                    )
                    print_json_response(result)

                elif args.command == "find-errors":
                    tool_args = {
                        "log_group_name": args.log_group_name,
                    }
                    if args.start_time:
                        tool_args["start_time"] = args.start_time
                    if args.end_time:
                        tool_args["end_time"] = args.end_time
                    if not (args.start_time or args.end_time):
                        tool_args["hours"] = args.hours
                    tool_args = add_aws_config_args(tool_args, args)
                    result = await session.call_tool(
                        "find_error_patterns",
                        arguments=tool_args,
                    )
                    print_json_response(result)

                elif args.command == "correlate":
                    tool_args = {
                        "log_group_names": args.log_group_names,
                        "search_term": args.search_term,
                    }
                    if args.start_time:
                        tool_args["start_time"] = args.start_time
                    if args.end_time:
                        tool_args["end_time"] = args.end_time
                    if not (args.start_time or args.end_time):
                        tool_args["hours"] = args.hours
                    tool_args = add_aws_config_args(tool_args, args)
                    result = await session.call_tool(
                        "correlate_logs",
                        arguments=tool_args,
                    )
                    print_json_response(result)

            except Exception as e:
                print(f"Error: {str(e)}", file=sys.stderr)
                sys.exit(1)


def print_json_response(content: str | tuple | object | None):
    """Print JSON content in a formatted way.

    Args:
        content: The content to print, which could be:
            - String (direct JSON content)
            - Tuple (from read_resource, where the first element is the content)
            - Object with .content or .text attributes (from CallToolResult)
            - None
    """
    try:
        # Handle None case
        if content is None:
            print("No content received.")
            return

        # For Session.read_resource responses, which returns tuple (meta, content)
        # but we found that sometimes content is None
        if isinstance(content, tuple):
            meta, content_text = (
                content
                if len(content) >= 2
                else (content[0] if len(content) == 1 else None, None)
            )

            # If we have usable content in the second element, use it
            if content_text is not None:
                content = content_text
            # Otherwise, if meta looks usable, try that
            elif isinstance(meta, str) and meta != "meta":
                content = meta
            # We don't have usable content in the tuple
            else:
                print("No usable content found in the response.")
                return

        # Handle object with content attribute (from CallToolResult)
        if hasattr(content, "content"):
            content = content.content

        # Handle object with text attribute
        if hasattr(content, "text"):
            content = content.text

        # Handle CallToolResult content from mcp_types which can be a list
        if isinstance(content, list) and all(hasattr(item, "text") for item in content):
            # Extract text from each item
            extracted_texts = [item.text for item in content if item.text]
            if extracted_texts:
                content = extracted_texts[0]  # Use the first text element

        # Handle if content is a custom object with __str__ method
        if not isinstance(content, (str, bytes, bytearray)) and hasattr(
            content, "__str__"
        ):
            content = str(content)

        # Try to handle various formats
        if isinstance(content, str):
            try:
                # Try to parse as JSON
                parsed = json.loads(content)
                print(json.dumps(parsed, indent=2))
            except json.JSONDecodeError:
                # Not valid JSON, just print the string
                print(content)
        elif isinstance(content, (dict, list)):
            # Direct Python objects
            print(json.dumps(content, indent=2, default=lambda x: str(x)))
        else:
            # Fall back to string representation
            print(content)

    except Exception as e:
        # Catch-all for any unexpected errors
        print(f"Error processing response: {e}")
        print(content)


if __name__ == "__main__":
    asyncio.run(main())

```