#
tokens: 35017/50000 7/69 files (page 3/3)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 3. Use http://codebase.md/sedwardstx/demomcp?page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── .mcp.json
├── check_server.py
├── CLAUDE.md
├── config
│   └── default.yml
├── docs
│   ├── api_reference.md
│   ├── demo-recording
│   │   └── MCPDemo.gif
│   ├── example-context-docs
│   │   ├── mcp-ai-agent-architecture.md
│   │   ├── mcp-ai-agent-dev-task.md
│   │   └── mcp-ai-agent-prd.md
│   └── getting_started.md
├── LICENSE
├── main_tcp.py
├── main.py
├── mcp_tcp_client.py
├── pyproject.toml
├── QUICK_START.md
├── README.md
├── scripts
│   └── test_server.py
├── setup.py
├── src
│   └── mcp_log_analyzer
│       ├── __init__.py
│       ├── api
│       │   ├── __init__.py
│       │   └── server.py
│       ├── config
│       │   ├── __init__.py
│       │   └── settings.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── models.py
│       │   └── state_manager.py
│       ├── mcp_server
│       │   ├── __init__.py
│       │   ├── models
│       │   │   ├── __init__.py
│       │   │   └── schemas.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── linux_testing_prompt.py
│       │   │   ├── log_management_prompt.py
│       │   │   ├── mcp_assets_overview_prompt.py
│       │   │   ├── network_testing_prompt.py
│       │   │   ├── process_monitoring_prompt.py
│       │   │   └── windows_testing_prompt.py
│       │   ├── resources
│       │   │   ├── __init__.py
│       │   │   ├── linux_resources.py
│       │   │   ├── logs_resources.py
│       │   │   ├── network_resources.py
│       │   │   ├── process_resources.py
│       │   │   └── windows_resources.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── health_check_tools.py
│       │       ├── linux_test_tools.py
│       │       ├── log_management_tools.py
│       │       ├── network_test_tools.py
│       │       ├── process_test_tools.py
│       │       └── windows_test_tools.py
│       ├── parsers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── csv_parser.py
│       │   ├── etl_cached_parser.py
│       │   ├── etl_large_file_parser.py
│       │   ├── etl_parser.py
│       │   ├── etl_windows_parser.py
│       │   └── evt_parser.py
│       └── tcp_proxy.py
├── TCP_PROXY_README.md
├── tcp_proxy.py
├── tcp_server.py
├── test_server.py
├── test_tcp_proxy.py
├── test_windows_setup.py
└── tests
    ├── test_base_parser.py
    ├── test_mcp_server.py
    ├── test_tool_utils.py
    └── test_utils.py
```

# Files

--------------------------------------------------------------------------------
/src/mcp_log_analyzer/mcp_server/prompts/log_management_prompt.py:
--------------------------------------------------------------------------------

```python
"""
Log Management prompts for the MCP Log Analyzer server.
"""

from typing import Optional
from mcp.server import FastMCP


def register_log_management_prompts(mcp: FastMCP):
    """Register all log management prompts."""

    @mcp.prompt(
        title="Register Log Source",
        description="Guide for registering new log sources for analysis"
    )
    async def register_log_source_guide() -> str:
        """
        Guide for registering various types of log sources.
        """
        return """
# 💾 Register Log Source Guide

## Tool: register_log_source

### Purpose
Register a new log source for analysis. Supports various log formats.

### Parameters
- **name**: Unique identifier for your log source
- **source_type**: Type of log (evt, json, xml, csv, text)
- **path**: File path or directory containing logs
- **config**: Additional parser configuration (optional)

### Supported Log Types
- **evt**: Windows Event Logs (System, Application, Security)
- **json**: JSON-formatted application logs
- **xml**: XML-structured logs
- **csv**: Comma-separated value logs
- **text**: Plain text logs (syslog, custom formats)

### Usage Examples
```
# Windows System Events
Tool: register_log_source
Parameters: name="windows_system", source_type="evt", path="System"

# JSON Application Logs
Tool: register_log_source
Parameters: name="app_logs", source_type="json", path="/var/log/myapp/app.json"

# Text-based Syslog
Tool: register_log_source
Parameters: name="syslog", source_type="text", path="/var/log/syslog"
```

### Best Practices
✅ Use descriptive names for easy identification
✅ Organize by system and log type
✅ Verify file path exists before registering
✅ Set appropriate parser configurations
"""

    @mcp.prompt(
        title="Query Logs",
        description="Guide for querying and filtering registered log sources"
    )
    async def query_logs_guide(
        filter_type: Optional[str] = None
    ) -> str:
        """
        Guide for querying logs with various filters.
        
        Args:
            filter_type: Type of filter (time, level, content, etc.)
        """
        
        base_guide = """
# 🔍 Query Logs Guide

## Tool: query_logs

### Purpose
Query and filter logs from registered sources with powerful filtering options.

### Basic Parameters
- **source_name**: Which registered source to query
- **start_time/end_time**: Define time ranges
- **limit/offset**: Paginate through results
- **filters**: Apply specific criteria

### Usage Examples
```
# Query recent errors
Tool: query_logs
Parameters: source_name="windows_system", filters={"level": "Error"}, start_time="1 hour ago"

# Search for specific content
Tool: query_logs
Parameters: source_name="app_logs", filters={"message_contains": "database error"}

# Paginate through results
Tool: query_logs
Parameters: source_name="syslog", limit=50, offset=100
```
"""

        filter_guides = {
            "time": """
### Time-based Filtering
- **Absolute time**: "2024-01-15 10:00:00"
- **Relative time**: "1 hour ago", "24 hours ago"
- **Time ranges**: start_time and end_time
- **Duration shortcuts**: "last_hours": 6

Examples:
```
# Last 24 hours
start_time="24 hours ago"

# Specific date range
start_time="2024-01-15 00:00:00", end_time="2024-01-15 23:59:59"
```
""",
            "level": """
### Level-based Filtering
- **Error levels**: Error, Warning, Info, Debug
- **Windows levels**: Error, Warning, Information
- **Syslog priorities**: 0-7 (emerg to debug)

Examples:
```
filters={"level": "Error"}
filters={"severity": "critical"}
filters={"priority": [0, 1, 2]}  # emerg, alert, crit
```
""",
            "content": """
### Content Filtering
- **Text search**: message_contains
- **Regex patterns**: regex_pattern
- **Field matching**: Exact field values
- **Multiple criteria**: AND/OR conditions

Examples:
```
filters={"message_contains": "authentication failed"}
filters={"regex_pattern": "error.*database.*timeout"}
filters={"event_id": 7001, "source": "Service Control Manager"}
```
"""
        }

        if filter_type and filter_type.lower() in filter_guides:
            base_guide += filter_guides[filter_type.lower()]
        
        base_guide += """
### Performance Tips
✅ Use time ranges to limit data scope
✅ Apply specific filters to reduce noise
✅ Start with recent time periods
✅ Use pagination for large datasets
"""
        
        return base_guide

    @mcp.prompt(
        title="Analyze Logs",
        description="Guide for running log analysis (summary, pattern, anomaly)"
    )
    async def analyze_logs_guide(
        analysis_type: Optional[str] = None
    ) -> str:
        """
        Guide for different types of log analysis.
        
        Args:
            analysis_type: Type of analysis (summary, pattern, anomaly)
        """
        
        base_guide = """
# 📊 Analyze Logs Guide

## Tool: analyze_logs

### Purpose
Perform advanced analysis on logs to identify patterns, anomalies, and trends.

### Analysis Types
- **summary**: General statistics and overview
- **pattern**: Detect recurring patterns and frequencies
- **anomaly**: Identify unusual or suspicious log entries

### Basic Usage
```
Tool: analyze_logs
Parameters: source_name="app_logs", analysis_type="summary"
```
"""

        analysis_guides = {
            "summary": """
### Summary Analysis
Provides high-level overview and statistics.

**What it shows:**
- Total log count and time range
- Error/Warning/Info distribution
- Top sources and components
- Peak activity periods
- Message frequency analysis

**Best for:**
- Initial investigation
- Health assessment
- Capacity planning
- Report generation

**Example:**
```
Tool: analyze_logs
Parameters: source_name="windows_system", analysis_type="summary", time_duration="24h"
```
""",
            "pattern": """
### Pattern Analysis
Detects recurring patterns and correlations.

**What it finds:**
- Frequent error messages
- Event sequences and correlations
- Time-based patterns (hourly, daily)
- Recurring issues
- Common failure modes

**Best for:**
- Root cause analysis
- Predictive maintenance
- Identifying systematic issues
- Performance optimization

**Example:**
```
Tool: analyze_logs
Parameters: source_name="app_logs", analysis_type="pattern", filters={"level": "Error"}
```
""",
            "anomaly": """
### Anomaly Detection
Identifies unusual events and outliers.

**What it detects:**
- Unusual error spikes or drops
- New error types not seen before
- Unexpected source activity
- Timing anomalies
- Statistical outliers

**Best for:**
- Security monitoring
- Early problem detection
- Change detection
- Incident investigation

**Example:**
```
Tool: analyze_logs
Parameters: source_name="security_logs", analysis_type="anomaly", time_duration="48h"
```
"""
        }

        if analysis_type and analysis_type.lower() in analysis_guides:
            base_guide += analysis_guides[analysis_type.lower()]
        else:
            # Show all types if none specified
            for guide in analysis_guides.values():
                base_guide += guide
        
        base_guide += """
### Analysis Strategy
1. Start with summary for overview
2. Use pattern analysis for recurring issues
3. Apply anomaly detection for security
4. Combine analyses for comprehensive insights
"""
        
        return base_guide

    @mcp.prompt(
        title="Manage Log Sources",
        description="Guide for listing, viewing, and deleting log sources"
    )
    async def manage_log_sources() -> str:
        """
        Guide for managing registered log sources.
        """
        return """
# 📋 Manage Log Sources

## Available Management Tools

### List All Sources
```
Tool: list_log_sources
```
Shows all registered log sources with:
- Source names and types
- File paths
- Registration timestamps
- Parser configurations

### Get Source Details
```
Tool: get_log_source
Parameters: name="source_name"
```
Provides detailed information about a specific source:
- Full configuration
- Parser settings
- Access status
- Recent activity

### Delete Log Source
```
Tool: delete_log_source
Parameters: name="source_name"
```
Removes a log source registration:
- Cleans up configuration
- Does not delete actual log files
- Frees up the source name

## Management Best Practices

### Organization
- Use naming conventions (system_component_type)
- Group related sources logically
- Document source purposes
- Regular cleanup of unused sources

### Maintenance
- Verify sources are still accessible
- Update paths after log rotation
- Remove obsolete sources
- Monitor source performance

### Examples
```
# List all sources to review
Tool: list_log_sources

# Check specific source status
Tool: get_log_source
Parameters: name="prod_app_logs"

# Remove old test source
Tool: delete_log_source
Parameters: name="test_logs_old"
```
"""

    @mcp.prompt(
        title="Windows Event Log Setup",
        description="Guide for setting up Windows Event Log sources"
    )
    async def windows_event_setup() -> str:
        """
        Guide for Windows Event Log configuration.
        """
        return """
# 🪟 Windows Event Log Setup

## Registering Windows Event Logs

### System Event Log
```
Tool: register_log_source
Parameters: name="windows_system", source_type="evt", path="System"
```
**Contains**: Hardware, drivers, system services, kernel events

### Application Event Log
```
Tool: register_log_source
Parameters: name="windows_application", source_type="evt", path="Application"
```
**Contains**: Application crashes, errors, informational events

### Security Event Log
```
Tool: register_log_source
Parameters: name="windows_security", source_type="evt", path="Security"
```
**Contains**: Authentication, authorization, audit events
**Note**: Requires administrator privileges

## Common Windows Queries

### Recent System Errors
```
Tool: query_logs
Parameters: source_name="windows_system", filters={"level": "Error"}, start_time="24 hours ago"
```

### Service Failures
```
Tool: query_logs
Parameters: source_name="windows_system", filters={"event_id": [7000, 7001, 7023]}
```

### Application Crashes
```
Tool: query_logs
Parameters: source_name="windows_application", filters={"event_id": 1000}
```

### Failed Logins (Security)
```
Tool: query_logs
Parameters: source_name="windows_security", filters={"event_id": 4625}
```

## Prerequisites
- Windows operating system
- pywin32 package installed
- Administrator rights for Security log
- Appropriate Event Log permissions
"""

    @mcp.prompt(
        title="Structured Log Setup",
        description="Guide for JSON, XML, and CSV log sources"
    )
    async def structured_log_setup(
        format_type: Optional[str] = None
    ) -> str:
        """
        Guide for structured log formats.
        
        Args:
            format_type: Log format type (json, xml, csv)
        """
        
        base_guide = """
# 📄 Structured Log Setup

## Supported Structured Formats
- **JSON**: JavaScript Object Notation logs
- **XML**: Extensible Markup Language logs
- **CSV**: Comma-Separated Values logs
"""

        format_guides = {
            "json": """
### JSON Log Configuration
```
Tool: register_log_source
Parameters: 
  name="app_json_logs"
  source_type="json"
  path="/var/log/app/application.json"
  config={
    "timestamp_field": "timestamp",
    "level_field": "severity",
    "message_field": "message"
  }
```

**Example JSON Format:**
```json
{
  "timestamp": "2024-01-15T10:30:00Z",
  "severity": "ERROR",
  "message": "Database connection failed",
  "component": "database",
  "error_code": "DB_001"
}
```

**Query Example:**
```
Tool: query_logs
Parameters: 
  source_name="app_json_logs"
  filters={"severity": "ERROR", "component": "database"}
```
""",
            "xml": """
### XML Log Configuration
```
Tool: register_log_source
Parameters:
  name="app_xml_logs"
  source_type="xml"
  path="/var/log/app/events.xml"
  config={
    "root_element": "events",
    "event_element": "event",
    "timestamp_path": "event/timestamp",
    "level_path": "event/level"
  }
```

**Example XML Format:**
```xml
<events>
  <event>
    <timestamp>2024-01-15T10:30:00Z</timestamp>
    <level>ERROR</level>
    <message>Service initialization failed</message>
    <source>ServiceManager</source>
  </event>
</events>
```

**Query Example:**
```
Tool: query_logs
Parameters:
  source_name="app_xml_logs"
  filters={"level": "ERROR", "source": "ServiceManager"}
```
""",
            "csv": """
### CSV Log Configuration
```
Tool: register_log_source
Parameters:
  name="app_csv_logs"
  source_type="csv"
  path="/var/log/app/metrics.csv"
  config={
    "delimiter": ",",
    "has_header": true,
    "timestamp_column": 0,
    "level_column": 2,
    "message_column": 3
  }
```

**Example CSV Format:**
```csv
timestamp,host,level,message,duration_ms
2024-01-15T10:30:00Z,server01,ERROR,Request timeout,5023
2024-01-15T10:30:01Z,server01,INFO,Request processed,245
```

**Query Example:**
```
Tool: query_logs
Parameters:
  source_name="app_csv_logs"
  filters={"level": "ERROR", "host": "server01"}
```
"""
        }

        if format_type and format_type.lower() in format_guides:
            base_guide += format_guides[format_type.lower()]
        else:
            for guide in format_guides.values():
                base_guide += guide
        
        base_guide += """
## Configuration Best Practices
✅ Specify field mappings clearly
✅ Use consistent timestamp formats
✅ Validate log format before registering
✅ Test queries after registration
✅ Document custom field meanings
"""
        
        return base_guide

    @mcp.prompt(
        title="Text Log Setup",
        description="Guide for plain text and custom format logs"
    )
    async def text_log_setup() -> str:
        """
        Guide for text-based log sources.
        """
        return """
# 📝 Text Log Setup

## Registering Text Logs

### Basic Text Log
```
Tool: register_log_source
Parameters:
  name="syslog"
  source_type="text"
  path="/var/log/syslog"
```

### Custom Format Configuration
```
Tool: register_log_source
Parameters:
  name="custom_app_log"
  source_type="text"
  path="/var/log/app/custom.log"
  config={
    "pattern": "(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) \\[(?P<level>\\w+)\\] (?P<message>.*)",
    "timestamp_format": "%Y-%m-%d %H:%M:%S"
  }
```

## Common Text Log Formats

### Syslog Format
```
Jan 15 10:30:45 hostname service[1234]: Error message here
```
**Config**: Built-in syslog parser

### Apache/Nginx Access Logs
```
192.168.1.1 - - [15/Jan/2024:10:30:45 +0000] "GET /api/data HTTP/1.1" 500 1234
```
**Config**: Use pattern matching for fields

### Application Logs
```
2024-01-15 10:30:45 [ERROR] [database] Connection pool exhausted
```
**Config**: Define custom regex pattern

## Query Examples

### Search by Content
```
Tool: query_logs
Parameters:
  source_name="syslog"
  filters={"message_contains": "authentication failed"}
```

### Filter by Pattern
```
Tool: query_logs
Parameters:
  source_name="custom_app_log"
  filters={"regex_pattern": "ERROR.*database.*timeout"}
```

## Parsing Tips
✅ Test regex patterns before registering
✅ Use named capture groups for fields
✅ Handle multi-line log entries
✅ Consider log rotation handling
✅ Validate timestamp parsing
"""

    @mcp.prompt(
        title="Log Analysis Workflow",
        description="Step-by-step workflow for comprehensive log analysis"
    )
    async def log_analysis_workflow() -> str:
        """
        Complete workflow for log analysis tasks.
        """
        return """
# 🔄 Log Analysis Workflow

## Step-by-Step Analysis Process

### 1. Setup Phase
```
# Register your log sources
Tool: register_log_source
Parameters: [appropriate for your log type]

# Verify registration
Tool: list_log_sources
```

### 2. Initial Assessment
```
# Get overview with summary analysis
Tool: analyze_logs
Parameters: source_name="your_source", analysis_type="summary"

# Check recent errors
Tool: query_logs
Parameters: source_name="your_source", filters={"level": "Error"}, start_time="6 hours ago"
```

### 3. Deep Dive Investigation
```
# Find patterns in errors
Tool: analyze_logs
Parameters: source_name="your_source", analysis_type="pattern", filters={"level": "Error"}

# Search for specific issues
Tool: query_logs
Parameters: source_name="your_source", filters={"message_contains": "specific error"}
```

### 4. Anomaly Detection
```
# Check for unusual activity
Tool: analyze_logs
Parameters: source_name="your_source", analysis_type="anomaly", time_duration="48h"
```

### 5. Reporting
- Document findings from summary analysis
- List identified patterns and frequencies
- Note any anomalies detected
- Provide recommendations

## Common Analysis Scenarios

### Performance Investigation
1. Register application logs
2. Query for performance warnings
3. Analyze patterns in slow operations
4. Identify peak problem times

### Security Audit
1. Register security/auth logs
2. Search for failed authentications
3. Detect anomalous access patterns
4. Review privilege escalations

### Error Troubleshooting
1. Register relevant log sources
2. Filter by error level
3. Analyze error patterns
4. Correlate with system events

### Capacity Planning
1. Analyze usage patterns over time
2. Identify growth trends
3. Find resource bottlenecks
4. Project future needs

## Best Practices
✅ Always start with summary analysis
✅ Use time-based filters to focus investigation
✅ Combine multiple analysis types
✅ Document your findings
✅ Clean up test sources when done
"""

    @mcp.prompt(
        title="Log Troubleshooting",
        description="Troubleshooting common log analysis issues"
    )
    async def log_troubleshooting() -> str:
        """
        Troubleshooting guide for common issues.
        """
        return """
# 🔧 Log Troubleshooting Guide

## Registration Issues

### "Log source already exists"
**Solution:**
1. List existing sources: `list_log_sources`
2. Delete if needed: `delete_log_source`
3. Choose different name

### "File not found"
**Solution:**
1. Verify file path is correct
2. Check file permissions
3. Ensure path is absolute, not relative
4. Test file access with system tools

### "Unsupported source type"
**Solution:**
- Valid types: evt, json, xml, csv, text
- Check spelling and case
- Use "text" for custom formats

## Query Issues

### "No logs returned"
**Possible causes:**
1. Time range too restrictive
2. Filters excluding all data
3. Log source empty in time range
4. Parsing errors

**Solutions:**
- Remove filters and try again
- Expand time range
- Check source has recent data
- Verify log format matches parser

### "Query timeout"
**Solutions:**
1. Reduce time range
2. Add more specific filters
3. Use pagination (limit/offset)
4. Query smaller time windows

### "Invalid filter format"
**Solutions:**
- Check filter field names
- Verify filter syntax
- Use correct data types
- Test filters incrementally

## Analysis Issues

### "Analysis returns empty"
**Check:**
1. Log source contains data
2. Time range includes logs
3. Filters not too restrictive
4. Analysis type is valid

### "Pattern analysis finds nothing"
**Solutions:**
- Increase time range for more data
- Ensure logs have patterns to find
- Check log format consistency
- Try different filter criteria

### "Anomaly detection not working"
**Requirements:**
- Sufficient historical data
- Consistent log format
- Baseline period available
- Varied log content

## Performance Issues

### Slow Queries
- Use specific time ranges
- Apply filters early
- Limit result count
- Index frequently searched fields

### Large Log Files
- Implement log rotation
- Archive old logs
- Use time-based queries
- Consider partitioning

### Memory Issues
- Process in smaller chunks
- Use streaming where possible
- Limit concurrent queries
- Monitor resource usage

## Platform-Specific Issues

### Windows
- **pywin32 missing**: Install with pip
- **Access denied**: Need admin rights
- **Security log**: Requires elevation

### Linux
- **Permission denied**: Check file permissions
- **Log rotation**: Handle rotated files
- **Different paths**: Check distribution

## Quick Fixes Checklist
- [ ] Verify log source is registered
- [ ] Check file permissions and access
- [ ] Validate time ranges in queries
- [ ] Test with minimal filters first
- [ ] Ensure proper log format
- [ ] Check system resources
- [ ] Review error messages carefully
"""
```

--------------------------------------------------------------------------------
/src/mcp_log_analyzer/mcp_server/tools/process_test_tools.py:
--------------------------------------------------------------------------------

```python
"""
Process monitoring and system resource testing MCP tools.
"""

from typing import Any, Dict, List

import psutil
from mcp.server import FastMCP
from pydantic import BaseModel, Field


class ProcessAnalysisRequest(BaseModel):
    """Request model for process analysis."""

    process_name: str = Field(None, description="Specific process name to analyze")
    min_cpu_percent: float = Field(0.0, description="Minimum CPU usage threshold")
    min_memory_percent: float = Field(0.0, description="Minimum memory usage threshold")
    max_results: int = Field(20, description="Maximum number of processes to return")
    sort_by: str = Field("cpu", description="Sort by 'cpu', 'memory', or 'pid'")


class SystemResourceRequest(BaseModel):
    """Request model for system resource monitoring."""

    include_network: bool = Field(True, description="Include network statistics")
    include_disk: bool = Field(True, description="Include disk I/O statistics")
    sample_interval: float = Field(1.0, description="Sampling interval in seconds")


class ProcessMonitoringRequest(BaseModel):
    """Request model for process monitoring over time."""

    process_name: str = Field(..., description="Process name to monitor")
    duration_seconds: int = Field(60, description="Monitoring duration in seconds")
    sample_interval: float = Field(5.0, description="Sampling interval in seconds")


def register_process_test_tools(mcp: FastMCP):
    """Register all process testing tools with the MCP server."""

    @mcp.tool()
    async def test_system_resources_access() -> Dict[str, Any]:
        """
        Test system resource monitoring capabilities.

        This tool checks if the system can access various system
        resource information and provides diagnostic data.
        """
        try:
            test_results = {}

            # Test basic system info access
            try:
                test_results["cpu"] = {
                    "accessible": True,
                    "cpu_count": psutil.cpu_count(),
                    "cpu_count_logical": psutil.cpu_count(logical=True),
                    "current_usage": psutil.cpu_percent(interval=0.1),
                }
            except Exception as e:
                test_results["cpu"] = {"accessible": False, "error": str(e)}

            # Test memory access
            try:
                memory = psutil.virtual_memory()
                test_results["memory"] = {
                    "accessible": True,
                    "total_gb": round(memory.total / (1024**3), 2),
                    "available_gb": round(memory.available / (1024**3), 2),
                    "percent_used": memory.percent,
                }
            except Exception as e:
                test_results["memory"] = {"accessible": False, "error": str(e)}

            # Test disk access
            try:
                disk = psutil.disk_usage("/")
                test_results["disk"] = {
                    "accessible": True,
                    "total_gb": round(disk.total / (1024**3), 2),
                    "used_gb": round(disk.used / (1024**3), 2),
                    "free_gb": round(disk.free / (1024**3), 2),
                    "percent_used": round((disk.used / disk.total) * 100, 1),
                }
            except Exception as e:
                test_results["disk"] = {"accessible": False, "error": str(e)}

            # Test network access
            try:
                network = psutil.net_io_counters()
                test_results["network"] = {
                    "accessible": True,
                    "bytes_sent": network.bytes_sent,
                    "bytes_recv": network.bytes_recv,
                    "packets_sent": network.packets_sent,
                    "packets_recv": network.packets_recv,
                }
            except Exception as e:
                test_results["network"] = {"accessible": False, "error": str(e)}

            # Test process enumeration
            try:
                processes = list(psutil.process_iter(["pid", "name"]))
                test_results["processes"] = {
                    "accessible": True,
                    "total_count": len(processes),
                    "sample_processes": [p.info for p in processes[:5]],
                }
            except Exception as e:
                test_results["processes"] = {"accessible": False, "error": str(e)}

            return {
                "status": "completed",
                "psutil_version": psutil.__version__,
                "test_results": test_results,
            }

        except Exception as e:
            return {"error": f"Error testing system resources: {str(e)}"}

    @mcp.tool()
    async def analyze_system_performance(
        request: SystemResourceRequest,
    ) -> Dict[str, Any]:
        """
        Analyze current system performance and resource usage.

        This tool provides a comprehensive analysis of system performance
        including CPU, memory, disk, and network usage patterns.
        """
        try:
            performance_data = {}

            # CPU Analysis
            cpu_percent = psutil.cpu_percent(interval=request.sample_interval)
            cpu_freq = psutil.cpu_freq()
            performance_data["cpu"] = {
                "usage_percent": cpu_percent,
                "core_count": psutil.cpu_count(),
                "logical_core_count": psutil.cpu_count(logical=True),
                "frequency": {
                    "current": cpu_freq.current if cpu_freq else None,
                    "min": cpu_freq.min if cpu_freq else None,
                    "max": cpu_freq.max if cpu_freq else None,
                },
                "load_average": (
                    psutil.getloadavg() if hasattr(psutil, "getloadavg") else None
                ),
            }

            # Memory Analysis
            memory = psutil.virtual_memory()
            swap = psutil.swap_memory()
            performance_data["memory"] = {
                "virtual": {
                    "total_gb": round(memory.total / (1024**3), 2),
                    "available_gb": round(memory.available / (1024**3), 2),
                    "used_gb": round(memory.used / (1024**3), 2),
                    "percent_used": memory.percent,
                },
                "swap": {
                    "total_gb": round(swap.total / (1024**3), 2),
                    "used_gb": round(swap.used / (1024**3), 2),
                    "percent_used": swap.percent,
                },
            }

            # Disk Analysis
            if request.include_disk:
                disk_usage = psutil.disk_usage("/")
                disk_io = psutil.disk_io_counters()
                performance_data["disk"] = {
                    "usage": {
                        "total_gb": round(disk_usage.total / (1024**3), 2),
                        "used_gb": round(disk_usage.used / (1024**3), 2),
                        "free_gb": round(disk_usage.free / (1024**3), 2),
                        "percent_used": round(
                            (disk_usage.used / disk_usage.total) * 100, 1
                        ),
                    },
                    "io_counters": (
                        {
                            "read_bytes": disk_io.read_bytes if disk_io else None,
                            "write_bytes": disk_io.write_bytes if disk_io else None,
                            "read_count": disk_io.read_count if disk_io else None,
                            "write_count": disk_io.write_count if disk_io else None,
                        }
                        if disk_io
                        else None
                    ),
                }

            # Network Analysis
            if request.include_network:
                net_io = psutil.net_io_counters()
                net_connections = len(psutil.net_connections())
                performance_data["network"] = {
                    "io_counters": (
                        {
                            "bytes_sent": net_io.bytes_sent if net_io else None,
                            "bytes_recv": net_io.bytes_recv if net_io else None,
                            "packets_sent": net_io.packets_sent if net_io else None,
                            "packets_recv": net_io.packets_recv if net_io else None,
                        }
                        if net_io
                        else None
                    ),
                    "active_connections": net_connections,
                }

            # Performance Assessment
            performance_status = "good"
            issues = []

            if cpu_percent > 80:
                performance_status = "concerning"
                issues.append(f"High CPU usage: {cpu_percent}%")
            elif cpu_percent > 60:
                performance_status = "fair"
                issues.append(f"Moderate CPU usage: {cpu_percent}%")

            if memory.percent > 90:
                performance_status = "concerning"
                issues.append(f"High memory usage: {memory.percent}%")
            elif memory.percent > 75:
                if performance_status == "good":
                    performance_status = "fair"
                issues.append(f"Moderate memory usage: {memory.percent}%")

            return {
                "performance_status": performance_status,
                "issues": issues,
                "performance_data": performance_data,
                "sampling_interval": request.sample_interval,
            }

        except Exception as e:
            return {"error": f"Error analyzing system performance: {str(e)}"}

    @mcp.tool()
    async def find_resource_intensive_processes(
        request: ProcessAnalysisRequest,
    ) -> Dict[str, Any]:
        """
        Find processes that are consuming significant system resources.

        This tool identifies processes with high CPU or memory usage
        and provides detailed information for troubleshooting.
        """
        try:
            processes = []

            # Collect process information
            for proc in psutil.process_iter(
                [
                    "pid",
                    "name",
                    "cpu_percent",
                    "memory_percent",
                    "memory_info",
                    "create_time",
                    "status",
                    "cmdline",
                ]
            ):
                try:
                    proc_info = proc.info

                    # Get CPU percentage with brief interval
                    if proc_info["cpu_percent"] is None:
                        proc_info["cpu_percent"] = proc.cpu_percent(interval=0.1)

                    # Apply filters
                    if (
                        request.process_name
                        and request.process_name.lower()
                        not in proc_info["name"].lower()
                    ):
                        continue

                    if proc_info["cpu_percent"] < request.min_cpu_percent:
                        continue

                    if proc_info["memory_percent"] < request.min_memory_percent:
                        continue

                    # Add additional details
                    proc_info["memory_mb"] = (
                        round(proc_info["memory_info"].rss / (1024 * 1024), 1)
                        if proc_info["memory_info"]
                        else 0
                    )
                    proc_info["command_line"] = (
                        " ".join(proc_info["cmdline"][:3])
                        if proc_info["cmdline"]
                        else ""
                    )

                    processes.append(proc_info)

                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    continue

            # Sort processes
            if request.sort_by == "cpu":
                processes.sort(key=lambda x: x.get("cpu_percent", 0), reverse=True)
            elif request.sort_by == "memory":
                processes.sort(key=lambda x: x.get("memory_percent", 0), reverse=True)
            elif request.sort_by == "pid":
                processes.sort(key=lambda x: x.get("pid", 0))

            # Limit results
            limited_processes = processes[: request.max_results]

            # Calculate summary statistics
            if processes:
                total_cpu = sum(p.get("cpu_percent", 0) for p in processes)
                total_memory = sum(p.get("memory_percent", 0) for p in processes)
                avg_cpu = total_cpu / len(processes)
                avg_memory = total_memory / len(processes)
            else:
                total_cpu = avg_cpu = total_memory = avg_memory = 0

            return {
                "search_criteria": {
                    "process_name": request.process_name,
                    "min_cpu_percent": request.min_cpu_percent,
                    "min_memory_percent": request.min_memory_percent,
                    "sort_by": request.sort_by,
                },
                "processes": limited_processes,
                "summary": {
                    "total_matching": len(processes),
                    "returned_count": len(limited_processes),
                    "total_cpu_usage": round(total_cpu, 1),
                    "total_memory_usage": round(total_memory, 1),
                    "average_cpu_usage": round(avg_cpu, 1),
                    "average_memory_usage": round(avg_memory, 1),
                },
            }

        except Exception as e:
            return {"error": f"Error finding resource intensive processes: {str(e)}"}

    @mcp.tool()
    async def monitor_process_health(process_name: str) -> Dict[str, Any]:
        """
        Monitor the health and status of a specific process.

        This tool provides detailed information about a specific process
        including resource usage, status, and potential issues.
        """
        try:
            matching_processes = []

            # Find all processes matching the name
            for proc in psutil.process_iter(
                [
                    "pid",
                    "name",
                    "cpu_percent",
                    "memory_percent",
                    "memory_info",
                    "create_time",
                    "status",
                    "cmdline",
                    "num_threads",
                    "connections",
                ]
            ):
                try:
                    if process_name.lower() in proc.info["name"].lower():
                        proc_info = proc.info.copy()

                        # Get current CPU usage
                        proc_info["current_cpu"] = proc.cpu_percent(interval=0.1)

                        # Add memory in MB
                        proc_info["memory_mb"] = (
                            round(proc_info["memory_info"].rss / (1024 * 1024), 1)
                            if proc_info["memory_info"]
                            else 0
                        )

                        # Get process age
                        from datetime import datetime

                        create_time = datetime.fromtimestamp(proc_info["create_time"])
                        proc_info["age"] = str(datetime.now() - create_time).split(".")[
                            0
                        ]

                        # Count network connections
                        try:
                            connections = proc.connections()
                            proc_info["network_connections"] = len(connections)
                        except (psutil.AccessDenied, psutil.NoSuchProcess):
                            proc_info["network_connections"] = "Access denied"

                        matching_processes.append(proc_info)

                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    continue

            if not matching_processes:
                return {
                    "process_name": process_name,
                    "found": False,
                    "message": f"No processes found matching '{process_name}'",
                }

            # Health assessment
            health_issues = []
            total_cpu = sum(p.get("current_cpu", 0) for p in matching_processes)
            total_memory = sum(p.get("memory_percent", 0) for p in matching_processes)

            if total_cpu > 50:
                health_issues.append(f"High CPU usage: {total_cpu:.1f}%")
            if total_memory > 20:
                health_issues.append(f"High memory usage: {total_memory:.1f}%")

            # Check for multiple instances
            if len(matching_processes) > 1:
                health_issues.append(
                    f"Multiple instances running: {len(matching_processes)}"
                )

            health_status = "healthy" if not health_issues else "issues_detected"

            return {
                "process_name": process_name,
                "found": True,
                "health_status": health_status,
                "health_issues": health_issues,
                "process_count": len(matching_processes),
                "processes": matching_processes,
                "summary": {
                    "total_cpu_usage": round(total_cpu, 1),
                    "total_memory_usage": round(total_memory, 1),
                    "total_memory_mb": sum(
                        p.get("memory_mb", 0) for p in matching_processes
                    ),
                },
            }

        except Exception as e:
            return {"error": f"Error monitoring process health: {str(e)}"}

    @mcp.tool()
    async def get_system_health_summary() -> Dict[str, Any]:
        """
        Get overall system health summary.

        This tool provides a comprehensive overview of system health
        including resource usage, top processes, and potential issues.
        """
        try:
            from datetime import datetime

            # System resource summary
            cpu_percent = psutil.cpu_percent(interval=1.0)
            memory = psutil.virtual_memory()
            disk = psutil.disk_usage("/")

            # Get top processes by CPU and memory
            processes = []
            for proc in psutil.process_iter(
                ["pid", "name", "cpu_percent", "memory_percent"]
            ):
                try:
                    proc_info = proc.info
                    if proc_info["cpu_percent"] is None:
                        proc_info["cpu_percent"] = proc.cpu_percent(interval=0.1)
                    processes.append(proc_info)
                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    continue

            # Top CPU consumers
            top_cpu = sorted(
                processes, key=lambda x: x.get("cpu_percent", 0), reverse=True
            )[:5]

            # Top memory consumers
            top_memory = sorted(
                processes, key=lambda x: x.get("memory_percent", 0), reverse=True
            )[:5]

            # Health assessment
            health_score = 100
            issues = []

            if cpu_percent > 80:
                health_score -= 30
                issues.append(f"High CPU usage: {cpu_percent}%")
            elif cpu_percent > 60:
                health_score -= 15
                issues.append(f"Moderate CPU usage: {cpu_percent}%")

            if memory.percent > 90:
                health_score -= 25
                issues.append(f"High memory usage: {memory.percent}%")
            elif memory.percent > 75:
                health_score -= 10
                issues.append(f"Moderate memory usage: {memory.percent}%")

            disk_percent = (disk.used / disk.total) * 100
            if disk_percent > 90:
                health_score -= 20
                issues.append(f"High disk usage: {disk_percent:.1f}%")
            elif disk_percent > 80:
                health_score -= 10
                issues.append(f"Moderate disk usage: {disk_percent:.1f}%")

            # Determine overall health status
            if health_score >= 80:
                health_status = "excellent"
            elif health_score >= 60:
                health_status = "good"
            elif health_score >= 40:
                health_status = "fair"
            else:
                health_status = "poor"

            return {
                "health_status": health_status,
                "health_score": max(0, health_score),
                "issues": issues,
                "system_resources": {
                    "cpu_usage_percent": cpu_percent,
                    "memory_usage_percent": memory.percent,
                    "disk_usage_percent": round(disk_percent, 1),
                    "process_count": len(processes),
                },
                "top_processes": {
                    "cpu_consumers": top_cpu,
                    "memory_consumers": top_memory,
                },
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

        except Exception as e:
            return {"error": f"Error getting system health summary: {str(e)}"}

```

--------------------------------------------------------------------------------
/src/mcp_log_analyzer/parsers/etl_cached_parser.py:
--------------------------------------------------------------------------------

```python
"""ETL parser with CSV caching to avoid repeated conversions."""

import csv
import hashlib
import json
import logging
import os
import platform
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Union

from ..core.models import LogRecord, LogSource, LogType
from .base import BaseParser

logger = logging.getLogger(__name__)


class EtlCachedParser(BaseParser):
    """ETL parser that caches CSV conversions for performance."""

    # Class-level cache directory
    _cache_dir: Optional[str] = None
    _cache_registry: Dict[str, Dict[str, Any]] = {}  # Maps ETL file paths to cached CSV paths
    _conversion_locks: Dict[str, Any] = {}  # Prevents concurrent conversions of same file

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """Initialize ETL cached parser.

        Args:
            config: Parser configuration.
        """
        super().__init__(config)
        self.tracerpt_path = self._find_tracerpt()
        self._init_cache_dir()

    @classmethod
    def _init_cache_dir(cls) -> None:
        """Initialize the cache directory if not already done."""
        if cls._cache_dir is None:
            # Create cache directory in temp
            cls._cache_dir = os.path.join(tempfile.gettempdir(), "mcp_etl_cache")
            os.makedirs(cls._cache_dir, exist_ok=True)

            # Load cache registry if it exists
            registry_file = os.path.join(cls._cache_dir, "cache_registry.json")
            if os.path.exists(registry_file):
                try:
                    with open(registry_file, "r") as f:
                        cls._cache_registry = json.load(f)
                    # Clean up stale entries
                    cls._cleanup_stale_cache()
                except Exception:
                    cls._cache_registry = {}

    @classmethod
    def _save_cache_registry(cls) -> None:
        """Save the cache registry to disk."""
        if cls._cache_dir is None:
            return
        registry_file = os.path.join(cls._cache_dir, "cache_registry.json")
        try:
            with open(registry_file, "w") as f:
                json.dump(cls._cache_registry, f, indent=2)
        except Exception as e:
            logger.error(f"Failed to save cache registry: {e}")

    @classmethod
    def _cleanup_stale_cache(cls) -> None:
        """Remove cache entries for files that no longer exist."""
        stale_entries = []
        for normalized_path, cache_info in cls._cache_registry.items():
            # Check if the CSV file still exists
            csv_exists = os.path.exists(cache_info.get("csv_path", ""))
            
            # For ETL file, try to check if it exists (normalized path might not be exact)
            # Just check if CSV is missing, since ETL path might have changed
            if not csv_exists:
                stale_entries.append(normalized_path)
                
        for entry in stale_entries:
            del cls._cache_registry[entry]

        if stale_entries:
            cls._save_cache_registry()

    def _find_tracerpt(self) -> Optional[str]:
        """Find tracerpt.exe on the system."""
        if platform.system() != "Windows":
            return None

        # Common locations for tracerpt.exe
        possible_paths = [
            r"C:\Windows\System32\tracerpt.exe",
            r"C:\Windows\SysWOW64\tracerpt.exe",
        ]

        for path in possible_paths:
            if os.path.exists(path):
                return path

        # Try to find it in PATH
        try:
            result = subprocess.run(
                ["where", "tracerpt.exe"], capture_output=True, text=True, check=False
            )
            if result.returncode == 0 and result.stdout.strip():
                return result.stdout.strip().split("\n")[0]
        except Exception:
            pass

        return None

    def is_available(self) -> bool:
        """Check if ETL parsing is available."""
        return self.tracerpt_path is not None

    def _get_cache_key(self, file_path: str) -> str:
        """Generate a cache key for an ETL file based on path and size."""
        path = Path(file_path)
        # Normalize the path to ensure consistency
        normalized_path = str(path.resolve()).lower()
        stat = path.stat()
        # Include normalized file path and size in key (not mtime to preserve cache)
        key_data = f"{normalized_path}|{stat.st_size}"
        return hashlib.md5(key_data.encode()).hexdigest()

    def _get_cached_csv(self, file_path: str) -> Optional[str]:
        """Get cached CSV path if it exists and is valid."""
        # Normalize the path to match how we store in registry
        normalized_path = str(Path(file_path).resolve()).lower()
        
        if normalized_path not in self._cache_registry:
            return None

        cache_info = self._cache_registry[normalized_path]
        cache_key = self._get_cache_key(file_path)

        # Check if cache is still valid
        if cache_info.get("cache_key") != cache_key:
            # File has changed, invalidate cache
            logger.info(f"ETL file has changed, invalidating cache for {file_path}")
            self._remove_cache_entry(file_path)
            return None

        csv_path = cache_info.get("csv_path")
        if csv_path and os.path.exists(csv_path):
            logger.info(f"Using cached CSV for {file_path}: {csv_path}")
            return str(csv_path)

        # CSV file missing, remove entry
        self._remove_cache_entry(file_path)
        return None

    def _remove_cache_entry(self, file_path: str) -> None:
        """Remove a cache entry and its CSV file."""
        # Normalize the path to match how we store in registry
        normalized_path = str(Path(file_path).resolve()).lower()
        
        if normalized_path in self._cache_registry:
            cache_info = self._cache_registry[normalized_path]
            csv_path = cache_info.get("csv_path")
            if csv_path and os.path.exists(csv_path):
                try:
                    os.remove(csv_path)
                    logger.info(f"Removed cached CSV: {csv_path}")
                except Exception as e:
                    logger.error(f"Failed to remove cached CSV: {e}")
            del self._cache_registry[normalized_path]
            self._save_cache_registry()

    def _convert_etl_to_csv_sync(self, etl_path: str) -> str:
        """Convert ETL to CSV using tracerpt, with locking to prevent concurrent conversions."""
        import threading

        # Use threading lock to prevent concurrent conversions of same file
        if etl_path not in self._conversion_locks:
            self._conversion_locks[etl_path] = threading.Lock()

        with self._conversion_locks[etl_path]:
            # Check again if CSV was created while waiting for lock
            cached_csv = self._get_cached_csv(etl_path)
            if cached_csv:
                return cached_csv

            # Generate output filename
            cache_key = self._get_cache_key(etl_path)
            csv_filename = f"etl_{cache_key}.csv"
            csv_path = os.path.join(self._cache_dir or tempfile.gettempdir(), csv_filename)

            # Check if the CSV file already exists in cache directory (missed by registry)
            if os.path.exists(csv_path):
                logger.info(f"Found existing CSV file (missed by registry): {csv_path}")
                # Update cache registry with normalized path
                normalized_path = str(Path(etl_path).resolve()).lower()
                file_size_mb = Path(etl_path).stat().st_size / (1024 * 1024)
                self._cache_registry[normalized_path] = {
                    "csv_path": csv_path,
                    "cache_key": cache_key,
                    "converted_at": datetime.now().isoformat(),
                    "file_size_mb": file_size_mb,
                    "conversion_duration_s": 0,  # Unknown
                }
                self._save_cache_registry()
                return csv_path

            logger.info(f"Converting ETL to CSV: {etl_path} -> {csv_path}")

            # Get file size for logging
            file_size_mb = Path(etl_path).stat().st_size / (1024 * 1024)
            logger.info(f"ETL file size: {file_size_mb:.1f} MB")

            # Run tracerpt
            if self.tracerpt_path is None:
                raise RuntimeError("tracerpt.exe not found")
            cmd = [
                self.tracerpt_path,
                etl_path,
                "-o",
                csv_path,
                "-of",
                "CSV",
                "-y",  # Overwrite without prompting
                "-lr",  # Less restrictive; attempt to process badly-formed events
            ]

            start_time = datetime.now()
            logger.info(f"Starting tracerpt conversion at {start_time}")
            logger.info(f"Converting ETL file: {etl_path}")
            logger.info(f"Output CSV: {csv_path}")

            try:
                # Start tracerpt process
                import threading
                import time

                process = subprocess.Popen(
                    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
                )

                # Monitor thread for progress updates
                def monitor_conversion() -> None:
                    elapsed = 0
                    while process.poll() is None:  # While process is running
                        time.sleep(30)  # Check every 30 seconds
                        elapsed += 30
                        if os.path.exists(csv_path):
                            try:
                                csv_size_mb = os.path.getsize(csv_path) / (1024 * 1024)
                                logger.info(
                                    f"ETL conversion in progress... {elapsed}s elapsed, CSV size: {csv_size_mb:.1f} MB"
                                )
                            except Exception:
                                logger.info(
                                    f"ETL conversion in progress... {elapsed}s elapsed"
                                )
                        else:
                            logger.info(
                                f"ETL conversion in progress... {elapsed}s elapsed, waiting for CSV creation..."
                            )

                # Start monitoring in background thread
                monitor_thread = threading.Thread(
                    target=monitor_conversion, daemon=True
                )
                monitor_thread.start()

                try:
                    # Wait for process to complete with timeout
                    stdout, stderr = process.communicate(
                        timeout=600
                    )  # 10 minute timeout

                    if process.returncode != 0:
                        raise RuntimeError(
                            f"tracerpt failed with code {process.returncode}: {stderr}"
                        )

                except subprocess.TimeoutExpired:
                    # Kill the process if it times out
                    process.terminate()
                    try:
                        process.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        process.kill()
                    raise RuntimeError("tracerpt conversion timed out after 10 minutes")

                end_time = datetime.now()
                duration = (end_time - start_time).total_seconds()
                logger.info(f"Tracerpt completed in {duration:.1f} seconds")

                if process.returncode != 0:
                    raise RuntimeError(
                        f"tracerpt failed with code {process.returncode}: {stderr}"
                    )

                # Verify CSV was created
                if not os.path.exists(csv_path):
                    raise RuntimeError("tracerpt completed but produced no output file")

                # Update cache registry with normalized path
                normalized_path = str(Path(etl_path).resolve()).lower()
                self._cache_registry[normalized_path] = {
                    "csv_path": csv_path,
                    "cache_key": cache_key,
                    "converted_at": datetime.now().isoformat(),
                    "file_size_mb": file_size_mb,
                    "conversion_duration_s": duration,
                }
                self._save_cache_registry()

                logger.info(f"Successfully cached ETL conversion: {csv_path}")
                return csv_path

            except subprocess.TimeoutExpired:
                raise RuntimeError("tracerpt conversion timed out after 10 minutes")
            except Exception as e:
                # Clean up partial file if it exists
                if os.path.exists(csv_path):
                    try:
                        os.remove(csv_path)
                    except Exception:
                        pass
                raise

    def parse_file(
        self, source: LogSource, file_path: Union[str, Path]
    ) -> Iterator[LogRecord]:
        """Parse ETL log records from a file using cached CSV.

        Args:
            source: The log source information.
            file_path: Path to the ETL file.

        Yields:
            LogRecord objects parsed from the ETL file.
        """
        if not self.is_available():
            raise RuntimeError(
                "Windows ETL parsing is not available. tracerpt.exe not found."
            )

        path = str(Path(file_path))
        if not os.path.exists(path):
            raise FileNotFoundError(f"ETL file not found: {file_path}")

        # Convert to CSV (cached)
        csv_path = self._convert_etl_to_csv_sync(path)

        # Parse CSV file
        yield from self._parse_csv_file(source, csv_path)

    def _parse_csv_file(
        self, source: LogSource, csv_path: str, limit: int = 10000, offset: int = 0
    ) -> Iterator[LogRecord]:
        """Parse records from the cached CSV file.

        Args:
            source: The log source information.
            csv_path: Path to the CSV file.
            limit: Maximum number of records to yield.
            offset: Number of records to skip.

        Yields:
            LogRecord objects.
        """
        records_yielded = 0
        records_skipped = 0

        with open(csv_path, "r", encoding="utf-8", errors="ignore") as f:
            reader = csv.DictReader(f)

            for row_num, row in enumerate(reader):
                # Handle offset
                if records_skipped < offset:
                    records_skipped += 1
                    continue

                # Convert and yield record
                log_record = self._convert_csv_row(source, row)
                if log_record:
                    yield log_record
                    records_yielded += 1

                    # Check limit
                    if records_yielded >= limit:
                        break

    def _convert_csv_row(
        self, source: LogSource, row: Dict[str, str]
    ) -> Optional[LogRecord]:
        """Convert a CSV row from tracerpt to a LogRecord.

        Args:
            source: The log source information.
            row: CSV row dictionary.

        Returns:
            LogRecord or None if conversion fails.
        """
        try:
            # Clean up field names (remove alignment underscores)
            clean_data = {}

            for key, value in row.items():
                if key and value:
                    # Remove leading/trailing underscores and spaces
                    clean_key = key.strip().strip("_").lower().replace(" ", "_")
                    clean_value = value.strip()
                    if clean_key and clean_value:
                        clean_data[clean_key] = clean_value

            # Try to parse timestamp from clock_time
            timestamp = None
            if "clock_time" in clean_data:
                # Clock time is in Windows FILETIME format (100-nanosecond intervals since 1601)
                try:
                    filetime = int(clean_data["clock_time"])
                    # Convert to Unix timestamp
                    unix_timestamp = (filetime - 116444736000000000) / 10000000.0
                    timestamp = datetime.fromtimestamp(unix_timestamp)
                except Exception:
                    pass

            return LogRecord(
                source_id=source.id,
                timestamp=timestamp,
                data=clean_data,
            )

        except Exception as e:
            if self.config.get("verbose", False):
                logger.error(f"Failed to convert CSV row: {e}")
            return None

    def parse(
        self,
        path: str,
        filters: Optional[Dict[str, Any]] = None,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None,
        limit: int = 1000,
        offset: int = 0,
    ) -> List[LogRecord]:
        """Parse ETL file with filtering and pagination using cache.

        Args:
            path: Path to the ETL file.
            filters: Optional filters to apply.
            start_time: Optional start time filter.
            end_time: Optional end time filter.
            limit: Maximum number of records to return.
            offset: Number of records to skip.

        Returns:
            List of LogRecord objects.
        """
        # Create a temporary log source for parsing
        temp_source = LogSource(
            name="temp_etl", type=LogType.ETL, path=path, metadata={}
        )

        records: List[LogRecord] = []

        for record in self.parse_file(temp_source, path):
            # Apply time filters
            if start_time and record.timestamp and record.timestamp < start_time:
                continue
            if end_time and record.timestamp and record.timestamp > end_time:
                continue

            # Apply custom filters
            if filters:
                if not self._match_filters(record, filters):
                    continue

            # We need to handle offset/limit at this level since parse_file
            # doesn't know about filters
            if len(records) < offset:
                continue

            records.append(record)

            if len(records) >= limit + offset:
                break

        # Apply offset by slicing
        if offset > 0 and len(records) > offset:
            return records[offset : offset + limit]
        else:
            return records[:limit]

    def _match_filters(self, record: LogRecord, filters: Dict[str, Any]) -> bool:
        """Check if a record matches the provided filters.

        Args:
            record: The log record to check.
            filters: Dictionary of filters to apply.

        Returns:
            True if record matches all filters.
        """
        for key, value in filters.items():
            record_value = record.data.get(key)

            if isinstance(value, list):
                if record_value not in value:
                    return False
            else:
                if record_value != value:
                    return False

        return True

    def parse_content(self, source: LogSource, content: str) -> Iterator[LogRecord]:
        """Parse ETL log records from content string.

        Note: ETL files are binary and cannot be parsed from string content.

        Args:
            source: The log source information.
            content: String content (not supported for ETL).

        Raises:
            NotImplementedError: ETL files must be parsed from file.
        """
        raise NotImplementedError(
            "ETL files are binary and must be parsed from file, not string content"
        )

    def validate_file(self, file_path: Union[str, Path]) -> bool:
        """Validate if the file can be parsed by this parser.

        Args:
            file_path: Path to validate.

        Returns:
            True if file appears to be an ETL file.
        """
        path = Path(file_path)

        # Check file extension
        if not str(path).lower().endswith(".etl"):
            return False

        # Check if file exists and is readable
        if not path.exists() or not path.is_file():
            return False

        # Check if we have tracerpt available
        if not self.is_available():
            return False

        return True

    @classmethod
    def cleanup_cache_for_source(cls, source_path: str) -> None:
        """Clean up cached CSV for a specific ETL source.

        Args:
            source_path: Path to the ETL file whose cache should be removed.
        """
        logger.info(f"Cleaning up cache for ETL source: {source_path}")

        # Ensure cache is initialized
        cls._init_cache_dir()

        # Remove cache entry (normalize path first)
        normalized_path = str(Path(source_path).resolve()).lower()
        if normalized_path in cls._cache_registry:
            cache_info = cls._cache_registry[normalized_path]
            csv_path = cache_info.get("csv_path")

            # Remove CSV file
            if csv_path and os.path.exists(csv_path):
                try:
                    os.remove(csv_path)
                    logger.info(f"Removed cached CSV file: {csv_path}")
                except Exception as e:
                    logger.error(f"Failed to remove cached CSV: {e}")

            # Remove from registry
            del cls._cache_registry[normalized_path]
            cls._save_cache_registry()
            logger.info(f"Removed cache registry entry for: {source_path}")

    @classmethod
    def cleanup_all_cache(cls) -> None:
        """Clean up all cached CSV files."""
        logger.info("Cleaning up all ETL cache")

        # Ensure cache is initialized
        cls._init_cache_dir()

        # Remove all CSV files
        for etl_path, cache_info in list(cls._cache_registry.items()):
            cls.cleanup_cache_for_source(etl_path)

        # Clear registry
        cls._cache_registry = {}
        cls._save_cache_registry()

```

--------------------------------------------------------------------------------
/src/mcp_log_analyzer/mcp_server/prompts/mcp_assets_overview_prompt.py:
--------------------------------------------------------------------------------

```python
"""
MCP Assets Overview prompts for the MCP Log Analyzer server.
"""

from typing import Optional
from mcp.server import FastMCP


def register_mcp_assets_prompts(mcp: FastMCP):
    """Register all MCP assets overview prompts."""

    @mcp.prompt(
        title="Quick Start Guide",
        description="Getting started with MCP Log Analyzer - essential first steps"
    )
    async def quick_start_guide() -> str:
        """
        Quick start guide for new users of MCP Log Analyzer.
        """
        return """
# 🚀 MCP Log Analyzer Quick Start

## Getting Started in 5 Minutes

### Step 1: Test Your Platform
```
# Windows users
Tool: test_windows_event_log_access

# Linux users  
Tool: test_linux_log_access
```

### Step 2: Check System Health
```
# Windows
Tool: get_windows_system_health

# Linux
Tool: get_linux_system_overview

# Cross-platform
Tool: get_system_health_summary
```

### Step 3: Register a Log Source
```
# Windows Event Log
Tool: register_log_source
Parameters: name="windows_system", source_type="evt", path="System"

# Linux systemd
Tool: register_log_source
Parameters: name="journal", source_type="text", path="/var/log/journal"
```

### Step 4: Query Your Logs
```
Tool: query_logs
Parameters: source_name="your_source", start_time="1 hour ago"
```

### Step 5: Analyze for Issues
```
Tool: analyze_logs
Parameters: source_name="your_source", analysis_type="summary"
```

## What's Next?
- Use tool-specific prompts for detailed guidance
- Explore resources for real-time monitoring
- Set up regular health checks
- Configure alerts for critical events
"""

    @mcp.prompt(
        title="Core Log Management Tools",
        description="Essential tools for log registration, querying, and analysis"
    )
    async def log_management_tools() -> str:
        """
        Reference for core log management tools.
        """
        return """
# 📊 Core Log Management Tools

## Registration & Management

### register_log_source
**Purpose**: Register new log sources for analysis
**Supports**: Windows Event Logs, JSON, XML, CSV, text files
```
Tool: register_log_source
Parameters: name="app_logs", source_type="json", path="/var/log/app.json"
```

### list_log_sources
**Purpose**: List all registered log sources
```
Tool: list_log_sources
```

### get_log_source
**Purpose**: Get details about a specific log source
```
Tool: get_log_source
Parameters: name="windows_system"
```

### delete_log_source
**Purpose**: Remove a registered log source
```
Tool: delete_log_source
Parameters: name="old_logs"
```

## Querying & Analysis

### query_logs
**Purpose**: Query and filter logs from registered sources
**Features**: Time ranges, filters, pagination
```
Tool: query_logs
Parameters: source_name="app_logs", filters={"level": "Error"}, start_time="1 hour ago"
```

### analyze_logs
**Purpose**: Advanced log analysis with ML capabilities
**Types**: summary, pattern, anomaly
```
Tool: analyze_logs
Parameters: source_name="app_logs", analysis_type="pattern"
```

## Common Use Cases
- Error investigation
- Performance analysis
- Security auditing
- Compliance reporting
- Trend analysis
"""

    @mcp.prompt(
        title="Windows Tools Reference",
        description="Windows-specific diagnostic and monitoring tools"
    )
    async def windows_tools_reference() -> str:
        """
        Complete reference for Windows system tools.
        """
        return """
# 🪟 Windows Tools Reference

## Diagnostic Tools

### test_windows_event_log_access
**Purpose**: Test Windows Event Log access and permissions
**Checks**: System, Application, Security logs; pywin32 availability
```
Tool: test_windows_event_log_access
```

### get_windows_event_log_info
**Purpose**: Get detailed Event Log information
**Parameters**: log_name (System/Application/Security), max_entries
```
Tool: get_windows_event_log_info
Parameters: log_name="System", max_entries=50
```

### query_windows_events_by_criteria
**Purpose**: Query Event Logs with specific filters
**Filters**: event_id, level, time_duration
```
Tool: query_windows_events_by_criteria
Parameters: event_id=7001, level="Error", time_duration="6h"
```

### get_windows_system_health
**Purpose**: Windows health overview from Event Logs
**Analyzes**: Last 24 hours of System and Application logs
```
Tool: get_windows_system_health
```

## Windows Resources
- `windows/system-events/{param}` - System Event logs
- `windows/application-events/{param}` - Application Event logs

## Key Event IDs
- **1074**: System shutdown/restart
- **7000-7034**: Service control events
- **1000**: Application crashes
- **4624/4625**: Logon success/failure
- **6008**: Unexpected shutdown
"""

    @mcp.prompt(
        title="Linux Tools Reference", 
        description="Linux-specific diagnostic and monitoring tools"
    )
    async def linux_tools_reference() -> str:
        """
        Complete reference for Linux system tools.
        """
        return """
# 🐧 Linux Tools Reference

## Diagnostic Tools

### test_linux_log_access
**Purpose**: Test Linux log file and systemd journal access
**Checks**: /var/log files, systemd journal, command availability
```
Tool: test_linux_log_access
```

### query_systemd_journal
**Purpose**: Query systemd journal with filters
**Parameters**: service_name, priority, time_duration, max_lines
```
Tool: query_systemd_journal
Parameters: service_name="nginx", priority="err", time_duration="2h"
```

### analyze_linux_services
**Purpose**: Analyze service status and health
**Parameters**: service_pattern, include_failed
```
Tool: analyze_linux_services
Parameters: service_pattern="mysql|postgres", include_failed=true
```

### get_linux_system_overview
**Purpose**: Comprehensive Linux system health
**Provides**: System info, resources, service status, critical errors
```
Tool: get_linux_system_overview
```

## Linux Resources
- `linux/systemd-logs/{param}` - systemd journal logs
- `linux/system-logs/{param}` - Traditional system logs

## Log Priorities
- **emerg (0)**: System unusable
- **alert (1)**: Immediate action
- **crit (2)**: Critical conditions
- **err (3)**: Error conditions
- **warning (4)**: Warning conditions
- **notice (5)**: Normal but significant
- **info (6)**: Informational
- **debug (7)**: Debug messages
"""

    @mcp.prompt(
        title="Process Monitoring Tools",
        description="Cross-platform process and resource monitoring tools"
    )
    async def process_monitoring_tools() -> str:
        """
        Reference for process monitoring tools.
        """
        return """
# 🖥️ Process Monitoring Tools

## Resource Testing

### test_system_resources_access
**Purpose**: Test system resource monitoring capabilities
**Tests**: CPU, memory, disk, network, process access
```
Tool: test_system_resources_access
```

## Performance Analysis

### analyze_system_performance
**Purpose**: Comprehensive performance metrics
**Parameters**: include_network, include_disk, sample_interval
```
Tool: analyze_system_performance
Parameters: include_network=true, sample_interval=5.0
```

### find_resource_intensive_processes
**Purpose**: Identify high CPU/memory consumers
**Parameters**: min_cpu_percent, min_memory_percent, sort_by, max_results
```
Tool: find_resource_intensive_processes
Parameters: min_cpu_percent=20, sort_by="cpu", max_results=10
```

### monitor_process_health
**Purpose**: Monitor specific process health
**Parameters**: process_name (required)
```
Tool: monitor_process_health
Parameters: process_name="postgres"
```

### get_system_health_summary
**Purpose**: Overall system health score and top consumers
**Provides**: Health score (0-100), resource usage, recommendations
```
Tool: get_system_health_summary
```

## Process Resources
- `processes/list` - Running processes with details
- `processes/summary` - Process statistics

## Health Thresholds
- **CPU**: >80% concerning, >95% critical
- **Memory**: >90% concerning, >95% critical
- **Disk**: >85% warning, >95% critical
"""

    @mcp.prompt(
        title="Network Diagnostic Tools",
        description="Network connectivity and troubleshooting tools"
    )
    async def network_diagnostic_tools() -> str:
        """
        Reference for network diagnostic tools.
        """
        return """
# 🌐 Network Diagnostic Tools

## Connectivity Testing

### test_network_tools_availability
**Purpose**: Check if network diagnostic tools are available
**Tests**: ping, ss/netstat, traceroute, dig/nslookup
```
Tool: test_network_tools_availability
```

### test_port_connectivity
**Purpose**: Test specific port connectivity
**Parameters**: ports (list), host, timeout
```
Tool: test_port_connectivity
Parameters: ports=[80, 443, 3306], host="localhost"
```

### test_network_connectivity
**Purpose**: Test connectivity to multiple hosts
**Parameters**: hosts (list), ping_count
```
Tool: test_network_connectivity
Parameters: hosts=["google.com", "8.8.8.8"], ping_count=3
```

## Analysis Tools

### analyze_network_connections
**Purpose**: Analyze active connections and listening ports
**Parameters**: include_listening, include_established, analyze_traffic
```
Tool: analyze_network_connections
Parameters: include_listening=true, analyze_traffic=true
```

### diagnose_network_issues
**Purpose**: Comprehensive network diagnostics
**Tests**: Multiple aspects of network health
```
Tool: diagnose_network_issues
```

## Network Resources
- `network/listening-ports` - Active listening ports
- `network/established-connections` - Active connections
- `network/all-connections` - All connections
- `network/statistics` - Interface statistics
- `network/routing-table` - Routing information
- `network/port/{port}` - Specific port details
"""

    @mcp.prompt(
        title="Resource Parameters Guide",
        description="How to use flexible parameters with MCP resources"
    )
    async def resource_parameters_guide(
        param_type: Optional[str] = None
    ) -> str:
        """
        Guide for using resource parameters.
        
        Args:
            param_type: Type of parameter (time, count, range)
        """
        
        base_guide = """
# 🎯 Resource Parameters Guide

## Overview
Many MCP resources support flexible parameters for filtering and time-based queries.
"""

        param_guides = {
            "time": """
## Time-based Parameters

### Duration Format: `/time/{duration}`
- **Minutes**: `/time/30m`, `/time/45m`
- **Hours**: `/time/1h`, `/time/6h`, `/time/12h`
- **Days**: `/time/1d`, `/time/7d`, `/time/30d`

### Examples
```
# Last hour of Windows events
Resource: windows/system-events/time/1h

# Last 30 minutes of Linux logs
Resource: linux/systemd-logs/time/30m
```
""",
            "count": """
## Count-based Parameters

### Format: `/last/{n}`
- Returns the last N entries
- Useful for quick checks
- Sorted by most recent first

### Examples
```
# Last 100 system events
Resource: windows/system-events/last/100

# Last 50 journal entries
Resource: linux/systemd-logs/last/50
```
""",
            "range": """
## Time Range Parameters

### Format: `/range/{start}/{end}`
- Specific time window queries
- ISO format or readable dates
- Timezone aware

### Examples
```
# Specific hour
Resource: windows/system-events/range/2025-01-07 13:00/2025-01-07 14:00

# Date range
Resource: linux/systemd-logs/range/2025-01-06/2025-01-07
```
"""
        }

        if param_type and param_type.lower() in param_guides:
            base_guide += param_guides[param_type.lower()]
        else:
            for guide in param_guides.values():
                base_guide += guide

        base_guide += """
## Best Practices
- Start with shorter time ranges
- Use count limits for large datasets  
- Combine with tool queries for filtering
- Cache results when analyzing patterns
"""
        
        return base_guide

    @mcp.prompt(
        title="Platform Support Overview",
        description="Cross-platform capabilities and requirements"
    )
    async def platform_support_overview() -> str:
        """
        Overview of platform support and requirements.
        """
        return """
# 🔍 Platform Support Overview

## Windows Support

### Requirements
- Windows 7+ (Server 2008 R2+)
- Python 3.7+
- pywin32 package (pip install pywin32)
- Administrator rights for Security log

### Capabilities
✅ Full Windows Event Log support
✅ Custom application logs (e.g., "Microsoft-Service Fabric/Admin")
✅ Performance counter access
✅ WMI integration
✅ Process and network monitoring

### Limitations
- Security log requires elevation
- Some ETW logs need special permissions
- Remote log access needs credentials

## Linux Support

### Requirements
- Linux kernel 3.10+
- systemd 219+ (for journal)
- Python 3.7+
- psutil package

### Capabilities
✅ systemd journal queries
✅ Traditional log file parsing
✅ Process and resource monitoring
✅ Network diagnostics
✅ Service management

### Distribution Support
- **Full**: Ubuntu 16.04+, Debian 8+, RHEL/CentOS 7+, Fedora 15+
- **Partial**: Older distributions (log files only)
- **Limited**: Non-systemd distributions

## Cross-Platform Features

### Available Everywhere
- Process monitoring (psutil)
- Network analysis
- Log file parsing (JSON, XML, CSV, text)
- Resource usage tracking
- Performance metrics

### Platform-Specific
- **Windows**: Event Logs, WMI, Performance Counters
- **Linux**: systemd journal, syslog, package managers
- **macOS**: Limited support (process/network only)

## Installation
```bash
# Core installation
pip install mcp-log-analyzer

# Windows-specific
pip install pywin32>=300

# Development mode
pip install -e ".[dev]"
```
"""

    @mcp.prompt(
        title="Common Workflows",
        description="Step-by-step guides for common analysis tasks"
    )
    async def common_workflows(
        workflow_type: Optional[str] = None
    ) -> str:
        """
        Common workflow patterns for log analysis.
        
        Args:
            workflow_type: Type of workflow (troubleshooting, security, performance)
        """
        
        base_guide = """
# 🔄 Common Workflows

## Choose Your Workflow
Select from troubleshooting, security, or performance analysis workflows.
"""

        workflow_guides = {
            "troubleshooting": """
## 🔧 Troubleshooting Workflow

### 1. Initial Assessment
```
# Check system health
Tool: get_system_health_summary

# Platform-specific health
Tool: get_windows_system_health  # or get_linux_system_overview
```

### 2. Register Relevant Logs
```
# System logs
Tool: register_log_source
Parameters: name="system", source_type="evt", path="System"

# Application logs
Tool: register_log_source
Parameters: name="app", source_type="json", path="/var/log/app.json"
```

### 3. Search for Errors
```
# Recent errors
Tool: query_logs
Parameters: source_name="system", filters={"level": "Error"}, start_time="6 hours ago"

# Specific error patterns
Tool: analyze_logs
Parameters: source_name="app", analysis_type="pattern"
```

### 4. Deep Dive
```
# Find root cause
Tool: analyze_logs
Parameters: source_name="system", analysis_type="anomaly", time_duration="24h"
```
""",
            "security": """
## 🔒 Security Analysis Workflow

### 1. Security Baseline
```
# Windows Security log
Tool: register_log_source
Parameters: name="security", source_type="evt", path="Security"

# Linux auth logs
Tool: query_systemd_journal
Parameters: service_name="sshd", time_duration="24h"
```

### 2. Authentication Analysis
```
# Failed logins (Windows)
Tool: query_logs
Parameters: source_name="security", filters={"event_id": 4625}

# SSH failures (Linux)
Tool: query_systemd_journal
Parameters: priority="warning", time_duration="6h"
```

### 3. Anomaly Detection
```
# Unusual patterns
Tool: analyze_logs
Parameters: source_name="security", analysis_type="anomaly"

# Network connections
Tool: analyze_network_connections
Parameters: analyze_traffic=true
```

### 4. Incident Response
```
# Track specific user
Tool: query_logs
Parameters: filters={"username": "suspicious_user"}

# Timeline analysis
Tool: query_logs
Parameters: start_time="2025-01-07 10:00", end_time="2025-01-07 14:00"
```
""",
            "performance": """
## 📊 Performance Analysis Workflow

### 1. Resource Baseline
```
# Current performance
Tool: analyze_system_performance
Parameters: sample_interval=5.0

# Resource consumers
Tool: find_resource_intensive_processes
Parameters: min_cpu_percent=10, sort_by="cpu"
```

### 2. Historical Analysis
```
# Register performance logs
Tool: register_log_source
Parameters: name="perf_logs", source_type="csv", path="/var/log/performance.csv"

# Query performance data
Tool: query_logs
Parameters: source_name="perf_logs", start_time="7 days ago"
```

### 3. Pattern Detection
```
# Find performance patterns
Tool: analyze_logs
Parameters: source_name="perf_logs", analysis_type="pattern"

# Monitor specific process
Tool: monitor_process_health
Parameters: process_name="database"
```

### 4. Optimization
```
# Identify bottlenecks
Tool: analyze_logs
Parameters: analysis_type="summary", filters={"response_time": ">1000"}

# Resource planning
Resource: processes/summary
```
"""
        }

        if workflow_type and workflow_type.lower() in workflow_guides:
            base_guide += workflow_guides[workflow_type.lower()]
        else:
            base_guide += """
## Available Workflows
- **troubleshooting**: Error investigation and root cause analysis
- **security**: Authentication monitoring and threat detection
- **performance**: Resource usage and optimization

Specify a workflow_type for detailed steps.
"""
        
        return base_guide

    @mcp.prompt(
        title="Tool Categories Overview",
        description="High-level overview of all tool categories"
    )
    async def tool_categories_overview() -> str:
        """
        Overview of all available tool categories.
        """
        return """
# 🛠️ Tool Categories Overview

## 📊 Log Management (6 tools)
**Purpose**: Core functionality for log handling
- Registration and source management
- Querying with advanced filters
- ML-powered analysis
- Multiple format support

## 🪟 Windows Tools (4 tools)
**Purpose**: Windows-specific diagnostics
- Event Log access testing
- Event querying and filtering
- System health assessment
- Service and application monitoring

## 🐧 Linux Tools (4 tools)
**Purpose**: Linux-specific diagnostics
- Log file and journal access
- systemd service analysis
- System overview and health
- Distribution compatibility

## 🖥️ Process Monitoring (5 tools)
**Purpose**: System resource analysis
- Resource access testing
- Performance metrics
- Process identification
- Health scoring

## 🌐 Network Diagnostics (5 tools)
**Purpose**: Network troubleshooting
- Tool availability checks
- Port and host connectivity
- Connection analysis
- Issue diagnosis

## 📋 Resources (15+ endpoints)
**Purpose**: Real-time data access
- System logs and events
- Process information
- Network statistics
- Flexible time parameters

## 💡 Prompts (60+ guides)
**Purpose**: Interactive guidance
- Step-by-step tutorials
- Best practices
- Troubleshooting help
- Platform-specific advice

## Total Assets
- **24** Tools
- **15+** Resources
- **60+** Prompts
- **5** Major categories
- **2** Primary platforms
"""

    @mcp.prompt(
        title="Troubleshooting Guide",
        description="Common issues and solutions when using MCP Log Analyzer"
    )
    async def troubleshooting_guide() -> str:
        """
        Troubleshooting common MCP Log Analyzer issues.
        """
        return """
# 🔧 Troubleshooting Guide

## Installation Issues

### "pywin32 not found" (Windows)
**Solution**:
```bash
pip install pywin32>=300
python -c "import win32api"  # Test
```

### "Permission denied" errors
**Windows**: Run as Administrator for Security logs
**Linux**: Add user to systemd-journal group:
```bash
sudo usermod -a -G systemd-journal $USER
```

### "Module not found" errors
**Solution**: Install in development mode
```bash
pip install -e ".[dev]"
```

## Tool Errors

### "No logs returned"
**Possible causes**:
1. Time range too restrictive
2. Filters excluding all data
3. Log source not properly registered
4. No data in specified timeframe

**Debug steps**:
```
# Check source is registered
Tool: list_log_sources

# Try without filters
Tool: query_logs
Parameters: source_name="your_source"

# Expand time range
Tool: query_logs
Parameters: source_name="your_source", start_time="7 days ago"
```

### "Access denied" to logs
**Windows**:
- Event Viewer works but tool doesn't? Check pywin32
- Security log needs admin rights
- Some logs need specific group membership

**Linux**:
- Check file permissions: `ls -la /var/log/`
- Journal access: `groups` should show systemd-journal
- Try with sudo as last resort

### "Analysis returns empty"
**Requirements**:
- Sufficient data (24+ hours for anomaly detection)
- Consistent log format
- Proper time range specification
- Valid analysis type

## Performance Issues

### Slow queries
**Solutions**:
1. Use specific time ranges (not "all time")
2. Add filters to reduce data
3. Use pagination (limit/offset)
4. Query smaller time windows

### High memory usage
**Solutions**:
1. Process logs in chunks
2. Use streaming where available
3. Limit concurrent operations
4. Clear old log sources

## Platform-Specific

### Windows Event Log issues
- Verify Event Log service is running
- Check Event Log size limits
- Clear full Event Logs if needed
- Ensure proper Windows version

### Linux systemd issues
- Verify systemd version (219+)
- Check journal persistence settings
- Ensure journal isn't corrupted
- Verify systemctl works

## Getting Help
1. Check the specific tool's prompt for guidance
2. Use platform-specific testing tools first
3. Verify prerequisites are met
4. Check system logs for errors
5. Report issues with full error messages
"""

    @mcp.prompt(
        title="Best Practices",
        description="Best practices for effective log analysis with MCP"
    )
    async def best_practices() -> str:
        """
        Best practices for using MCP Log Analyzer effectively.
        """
        return """
# 📋 Best Practices

## Log Source Management

### Naming Conventions
✅ Use descriptive, consistent names
```
# Good
name="prod_web_nginx"
name="dev_app_json"
name="windows_system_events"

# Avoid
name="logs1"
name="test"
name="x"
```

### Organization
- Group by environment (prod/dev/test)
- Include service/application name
- Add log type suffix
- Document purpose

### Maintenance
- Regularly review registered sources
- Remove obsolete sources
- Update paths after log rotation
- Monitor source accessibility

## Query Optimization

### Time Ranges
1. Start with recent data (1-6 hours)
2. Expand gradually if needed
3. Use specific ranges for investigations
4. Avoid open-ended queries

### Filtering Strategy
```
# Efficient: Specific filters
filters={"level": "Error", "service": "database"}

# Inefficient: Broad search
filters={"message_contains": "error"}
```

### Pagination
```
# For large datasets
limit=100, offset=0  # First page
limit=100, offset=100  # Second page
```

## Analysis Strategy

### Progressive Analysis
1. **Summary** → Overview
2. **Pattern** → Recurring issues
3. **Anomaly** → Unusual events

### Correlation
- Compare multiple log sources
- Cross-reference timestamps
- Look for cascading failures
- Track error propagation

## Resource Monitoring

### Establish Baselines
- Normal CPU usage
- Typical memory consumption
- Average process count
- Standard network activity

### Regular Health Checks
```
# Daily
Tool: get_system_health_summary

# Weekly
Tool: analyze_logs
Parameters: analysis_type="pattern", time_duration="7d"

# Monthly
Full system audit and capacity review
```

## Security Practices

### Access Control
- Limit Security log access
- Use read-only permissions
- Audit log access
- Rotate credentials

### Sensitive Data
- Filter out passwords
- Mask personal information
- Comply with regulations
- Secure log storage

## Performance Tips

### Query Performance
- Index frequently searched fields
- Archive old logs
- Use appropriate time windows
- Cache analysis results

### System Impact
- Schedule intensive analysis off-peak
- Monitor resource usage during queries
- Use sampling for large datasets
- Implement rate limiting

## Documentation

### What to Document
- Registered log sources and purposes
- Common query patterns
- Known issues and solutions
- Analysis findings
- Custom configurations

### Automation
- Script common workflows
- Set up alerts for critical events
- Automate health reports
- Schedule regular analyses
"""

```

--------------------------------------------------------------------------------
/tcp_proxy.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
TCP proxy for MCP servers that bridges TCP connections to stdio-based MCP servers.
Enhanced with automatic process restart, heartbeat, and connection resilience.
"""

import asyncio
import argparse
import json
import logging
import time
from typing import Optional, Dict, Any

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class MCPProcess:
    """Manages an MCP server subprocess with automatic restart capability."""
    
    def __init__(self, command: list[str], auto_restart: bool = True):
        self.command = command
        self.process: Optional[asyncio.subprocess.Process] = None
        self._closed = False
        self.auto_restart = auto_restart
        self.restart_count = 0
        self.max_restarts = 5
        self.last_restart_time = 0
        self.restart_cooldown = 5.0  # Minimum seconds between restarts
    
    async def start(self):
        """Start the MCP server process."""
        logger.info(f"Starting MCP server: {' '.join(self.command)}")
        self.process = await asyncio.create_subprocess_exec(
            *self.command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            limit=10 * 1024 * 1024  # 10MB limit for large responses
        )
        logger.info(f"MCP server started with PID {self.process.pid}")
        self.last_restart_time = time.time()
    
    async def restart(self) -> bool:
        """Restart the MCP server process if allowed."""
        if not self.auto_restart:
            return False
            
        current_time = time.time()
        # Reduce cooldown for normal restarts (process completed successfully)
        cooldown = self.restart_cooldown if self.restart_count > 0 else 1.0
        if current_time - self.last_restart_time < cooldown:
            wait_time = cooldown - (current_time - self.last_restart_time)
            logger.info(f"Waiting {wait_time:.1f}s before restart (cooldown)")
            await asyncio.sleep(wait_time)
        
        if self.restart_count >= self.max_restarts:
            logger.error(f"Max restarts ({self.max_restarts}) reached, not restarting")
            return False
        
        self.restart_count += 1
        logger.info(f"Restarting MCP server (attempt {self.restart_count}/{self.max_restarts})")
        
        # Close existing process
        if self.process:
            try:
                self.process.terminate()
                await asyncio.wait_for(self.process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                logger.warning("Process didn't terminate, killing it")
                self.process.kill()
                await self.process.wait()
        
        self._closed = False
        self.process = None
        
        # Start new process
        try:
            await self.start()
            logger.info("MCP server restarted successfully")
            return True
        except Exception as e:
            logger.error(f"Failed to restart MCP server: {e}")
            return False
    
    def is_alive(self) -> bool:
        """Check if the process is still running."""
        return self.process is not None and self.process.returncode is None
    
    async def send_message(self, message: Dict[str, Any]) -> None:
        """Send a message to the MCP server."""
        if self.process is None or self.process.stdin is None:
            raise RuntimeError("MCP process not started")
        
        message_str = json.dumps(message) + '\n'
        self.process.stdin.write(message_str.encode('utf-8'))
        await self.process.stdin.drain()
    
    async def read_message(self) -> Optional[Dict[str, Any]]:
        """Read a message from the MCP server."""
        if self.process is None or self.process.stdout is None:
            raise RuntimeError("MCP process not started")
        
        try:
            # Increase the limit for readline to handle larger responses
            line = await self.process.stdout.readline()
            if not line:
                return None
            
            message_str = line.decode('utf-8').strip()
            if message_str:
                return json.loads(message_str)
            return None
        except json.JSONDecodeError as e:
            logger.error(f"Failed to decode JSON: {e}")
            return None
    
    async def read_stderr(self) -> Optional[str]:
        """Read from stderr (non-blocking)."""
        if self.process is None or self.process.stderr is None:
            return None
        
        try:
            # Try to read stderr without blocking
            line = await asyncio.wait_for(
                self.process.stderr.readline(), 
                timeout=0.1
            )
            if line:
                return line.decode('utf-8').strip()
        except asyncio.TimeoutError:
            pass
        return None
    
    async def close(self) -> None:
        """Close the MCP server process."""
        if self.process is not None and not self._closed:
            self._closed = True
            self.auto_restart = False  # Disable auto-restart on explicit close
            if self.process.stdin:
                self.process.stdin.close()
            self.process.terminate()
            await self.process.wait()
            logger.info(f"MCP server process {self.process.pid} terminated")


class TCPToMCPBridge:
    """Bridges TCP connections to MCP server processes with heartbeat support."""
    
    def __init__(self, mcp_command: list[str], heartbeat_interval: float = 30.0):
        self.mcp_command = mcp_command
        self.heartbeat_interval = heartbeat_interval
        self.heartbeat_timeout = heartbeat_interval * 2
    
    async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        """Handle a TCP client connection with resilience."""
        addr = writer.get_extra_info('peername')
        logger.info(f"New client connected from {addr}")
        
        # Track connection state
        connection_state = {
            'last_heartbeat_sent': time.time(),
            'last_heartbeat_received': time.time(),
            'connected': True,
            'client_supports_heartbeat': False,
            'mcp_initialized': False,
            'pending_requests': {},  # Track pending requests
            'last_request_id': None,
            'writer': writer  # Store writer for use in process monitor
        }
        
        # Start MCP process for this client
        mcp_process = MCPProcess(self.mcp_command, auto_restart=True)
        
        try:
            await mcp_process.start()
            
            # Don't initialize MCP session here - let the client drive it
            # The client will send the initialize request when ready
            logger.info("MCP process started, waiting for client initialization...")
            
            # Create tasks for bidirectional communication
            tcp_to_mcp_task = asyncio.create_task(
                self._tcp_to_mcp(reader, mcp_process, connection_state)
            )
            mcp_to_tcp_task = asyncio.create_task(
                self._mcp_to_tcp(mcp_process, writer, connection_state)
            )
            stderr_monitor_task = asyncio.create_task(
                self._monitor_stderr(mcp_process)
            )
            heartbeat_task = asyncio.create_task(
                self._heartbeat_loop(writer, mcp_process, connection_state)
            )
            process_monitor_task = asyncio.create_task(
                self._monitor_process(mcp_process, connection_state)
            )
            
            # Wait for any task to complete
            done, pending = await asyncio.wait(
                [tcp_to_mcp_task, mcp_to_tcp_task, heartbeat_task, process_monitor_task],
                return_when=asyncio.FIRST_COMPLETED
            )
            
            # Cancel remaining tasks
            for task in pending:
                task.cancel()
            stderr_monitor_task.cancel()
            
            # Wait for cancellation
            await asyncio.gather(*pending, stderr_monitor_task, return_exceptions=True)
            
        except Exception as e:
            logger.error(f"Error handling client {addr}: {e}", exc_info=True)
        finally:
            logger.info(f"Client {addr} disconnected")
            connection_state['connected'] = False
            await mcp_process.close()
            writer.close()
            await writer.wait_closed()
    
    async def _tcp_to_mcp(self, reader: asyncio.StreamReader, mcp_process: MCPProcess, connection_state: dict) -> None:
        """Forward messages from TCP client to MCP process, handling heartbeats."""
        buffer = b""
        
        while connection_state['connected']:
            try:
                # Read data from TCP
                chunk = await reader.read(4096)
                if not chunk:
                    logger.info("TCP connection closed by client")
                    break
                
                buffer += chunk
                
                # Process complete messages
                while b'\n' in buffer:
                    line, buffer = buffer.split(b'\n', 1)
                    if line:
                        try:
                            message = json.loads(line.decode('utf-8'))
                            
                            # Handle heartbeat messages
                            if message.get('type') == 'heartbeat':
                                connection_state['last_heartbeat_received'] = time.time()
                                connection_state['client_supports_heartbeat'] = True
                                # Don't forward heartbeats to MCP server
                                continue
                            elif message.get('type') == 'handshake':
                                connection_state['client_supports_heartbeat'] = True
                                # Don't forward handshake to MCP server
                                continue
                                
                            logger.debug(f"TCP -> MCP: {message}")
                            
                            # Track request if it has an ID (for potential replay)
                            request_id = message.get('id')
                            if request_id:
                                connection_state['pending_requests'][request_id] = message
                                connection_state['last_request_id'] = request_id
                            
                            # Check if MCP process is alive before sending
                            if not mcp_process.is_alive():
                                logger.warning("MCP process is dead, waiting for monitor to restart...")
                                # Don't manually restart here, let the monitor handle it
                                # This prevents race conditions
                                await asyncio.sleep(0.5)
                                continue
                            
                            # Always forward messages - let MCP server handle initialization state
                            await mcp_process.send_message(message)
                            
                            # Mark as initialized if this was an initialize request
                            if message.get('method') == 'initialize':
                                connection_state['mcp_initialized'] = True
                            
                        except json.JSONDecodeError as e:
                            logger.error(f"Invalid JSON from TCP client: {e}")
                        except Exception as e:
                            logger.error(f"Error forwarding to MCP: {e}")
                            
            except asyncio.CancelledError:
                raise
            except Exception as e:
                logger.error(f"Error reading from TCP: {e}")
                break
    
    async def _mcp_to_tcp(self, mcp_process: MCPProcess, writer: asyncio.StreamWriter, connection_state: dict) -> None:
        """Forward messages from MCP process to TCP client."""
        while connection_state['connected']:
            try:
                message = await mcp_process.read_message()
                if message is None:
                    logger.info("MCP process closed - letting monitor handle restart")
                    # Don't restart here, let the monitor handle it
                    # Just wait briefly and check again
                    await asyncio.sleep(0.5)
                    continue
                
                logger.debug(f"MCP -> TCP: {message}")
                
                # Clear pending request if this is a response
                response_id = message.get('id')
                if response_id and response_id in connection_state.get('pending_requests', {}):
                    del connection_state['pending_requests'][response_id]
                    logger.debug(f"Cleared pending request {response_id}")
                
                # Send to TCP client
                message_str = json.dumps(message) + '\n'
                writer.write(message_str.encode('utf-8'))
                await writer.drain()
                
            except asyncio.CancelledError:
                raise
            except Exception as e:
                logger.error(f"Error forwarding to TCP: {e}")
                # Check if the error is due to closed connection
                if writer.is_closing():
                    logger.info("TCP connection is closed")
                    break
                # Otherwise, might be MCP process issue - let monitor handle it
                if not mcp_process.is_alive():
                    logger.debug("MCP process not alive, waiting for monitor to restart")
                    await asyncio.sleep(0.5)
                    continue
                break
    
    async def _monitor_stderr(self, mcp_process: MCPProcess) -> None:
        """Monitor MCP process stderr for debugging."""
        while True:
            try:
                stderr_line = await mcp_process.read_stderr()
                if stderr_line:
                    logger.warning(f"MCP stderr: {stderr_line}")
                else:
                    await asyncio.sleep(0.1)
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error reading stderr: {e}")
                await asyncio.sleep(1)
    
    async def _heartbeat_loop(self, writer: asyncio.StreamWriter, mcp_process: MCPProcess, connection_state: dict) -> None:
        """Send periodic heartbeat messages to the client."""
        while connection_state['connected']:
            try:
                await asyncio.sleep(self.heartbeat_interval)
                
                # Only send heartbeats if client supports them
                if connection_state['client_supports_heartbeat']:
                    heartbeat_msg = {
                        'type': 'heartbeat_response',
                        'timestamp': time.time(),
                        'mcp_alive': mcp_process.is_alive()
                    }
                    
                    message_str = json.dumps(heartbeat_msg) + '\n'
                    writer.write(message_str.encode('utf-8'))
                    await writer.drain()
                    
                    connection_state['last_heartbeat_sent'] = time.time()
                    
                    # Check if we've received a heartbeat recently
                    if time.time() - connection_state['last_heartbeat_received'] > self.heartbeat_timeout:
                        logger.warning("Client heartbeat timeout")
                        # Client might be disconnected, but let TCP detect it
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in heartbeat loop: {e}")
                if writer.is_closing():
                    break
    
    async def _initialize_mcp_session(self, mcp_process: MCPProcess, writer: asyncio.StreamWriter, connection_state: dict) -> None:
        """Initialize MCP session after process start/restart."""
        try:
            logger.info("Initializing MCP session...")
            
            # Send MCP initialization message
            init_msg = {
                "jsonrpc": "2.0",
                "method": "initialize",
                "params": {
                    "capabilities": {
                        "tools": True,
                        "resources": True,
                        "prompts": True
                    }
                },
                "id": "init-" + str(time.time())
            }
            
            await mcp_process.send_message(init_msg)
            logger.debug(f"Sent MCP initialize request: {init_msg}")
            
            # Wait for initialization response
            start_time = time.time()
            while time.time() - start_time < 10:  # 10 second timeout
                response = await mcp_process.read_message()
                if response:
                    logger.debug(f"MCP initialization response: {response}")
                    if response.get("id") == init_msg["id"]:
                        if "result" in response:
                            connection_state['mcp_initialized'] = True
                            logger.info("MCP session initialized successfully")
                            
                            # Send initialized notification
                            initialized_msg = {
                                "jsonrpc": "2.0",
                                "method": "initialized",
                                "params": {}
                            }
                            await mcp_process.send_message(initialized_msg)
                            logger.debug("Sent MCP initialized notification")
                            
                            # Forward the initialization response to the client
                            if writer and not writer.is_closing():
                                response_str = json.dumps(response) + '\n'
                                writer.write(response_str.encode('utf-8'))
                                await writer.drain()
                            
                            return
                        elif "error" in response:
                            logger.error(f"MCP initialization failed: {response['error']}")
                            return
                    else:
                        # Handle other messages during initialization
                        if writer and not writer.is_closing():
                            response_str = json.dumps(response) + '\n'
                            writer.write(response_str.encode('utf-8'))
                            await writer.drain()
                
                await asyncio.sleep(0.1)
            
            logger.error("MCP initialization timed out")
            
        except Exception as e:
            logger.error(f"Error initializing MCP session: {e}")
            connection_state['mcp_initialized'] = False
    
    async def _monitor_process(self, mcp_process: MCPProcess, connection_state: dict) -> None:
        """Monitor the MCP process health and restart if needed."""
        while connection_state['connected']:
            try:
                await asyncio.sleep(2)  # Check every 2 seconds for faster response
                
                if not mcp_process.is_alive() and mcp_process.auto_restart:
                    logger.info("MCP process terminated, restarting to maintain session...")
                    
                    # Store pending requests that might need to be replayed
                    pending_requests = connection_state.get('pending_requests', {}).copy()
                    
                    if await mcp_process.restart():
                        logger.info("MCP process restarted successfully")
                        
                        # Reset initialization state
                        connection_state['mcp_initialized'] = False
                        
                        # Wait a moment for the process to stabilize
                        await asyncio.sleep(0.5)
                        
                        # Check if we have a valid writer
                        writer = connection_state.get('writer')
                        if writer and not writer.is_closing():
                            # Send a notification to the client about the restart
                            restart_notification = {
                                "jsonrpc": "2.0",
                                "method": "$/mcp/serverRestarted",
                                "params": {
                                    "timestamp": time.time(),
                                    "reason": "process_terminated"
                                }
                            }
                            try:
                                notification_str = json.dumps(restart_notification) + '\n'
                                writer.write(notification_str.encode('utf-8'))
                                await writer.drain()
                                logger.info("Notified client of server restart")
                            except Exception as e:
                                logger.error(f"Failed to notify client of restart: {e}")
                            
                            # The client should reinitialize when it receives this notification
                            logger.info("Waiting for client to reinitialize MCP session...")
                        else:
                            logger.warning("No valid writer available for client notification")
                        
                        # Reset restart count on successful restart to be more forgiving
                        if mcp_process.restart_count > 0:
                            mcp_process.restart_count = max(0, mcp_process.restart_count - 1)
                        
                        # Clear old pending requests as they're likely stale after restart
                        if pending_requests:
                            logger.info(f"Clearing {len(pending_requests)} stale pending requests after restart")
                            connection_state['pending_requests'].clear()
                    else:
                        logger.error("Failed to restart MCP process")
                        # If we can't restart, terminate the connection
                        connection_state['connected'] = False
                        break
                        
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in process monitor: {e}")
                await asyncio.sleep(2)


async def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description='TCP proxy for MCP servers')
    parser.add_argument('--host', default='0.0.0.0', 
                       help='Host to bind to (default: 0.0.0.0)')
    parser.add_argument('--port', type=int, default=8080, 
                       help='Port to bind to (default: 8080)')
    parser.add_argument('--heartbeat-interval', type=float, default=30.0,
                       help='Heartbeat interval in seconds (default: 30)')
    parser.add_argument('--auto-restart', action='store_true', default=True,
                       help='Enable automatic MCP process restart (default: enabled)')
    parser.add_argument('--no-auto-restart', dest='auto_restart', action='store_false',
                       help='Disable automatic MCP process restart')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug logging')
    parser.add_argument('mcp_command', nargs='+',
                       help='Command to run the MCP server (e.g., python main.py)')
    
    args = parser.parse_args()
    
    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)
    
    # Create bridge
    bridge = TCPToMCPBridge(args.mcp_command, heartbeat_interval=args.heartbeat_interval)
    
    # Start TCP server
    logger.info(f"Starting TCP proxy on {args.host}:{args.port}")
    logger.info(f"MCP command: {' '.join(args.mcp_command)}")
    logger.info(f"Heartbeat interval: {args.heartbeat_interval}s")
    logger.info(f"Auto-restart: {'enabled' if args.auto_restart else 'disabled'}")
    
    server = await asyncio.start_server(
        bridge.handle_client,
        args.host,
        args.port,
        limit=10 * 1024 * 1024  # 10MB limit for large messages
    )
    
    addr = server.sockets[0].getsockname()
    logger.info(f"TCP proxy listening on {addr[0]}:{addr[1]}")
    
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("TCP proxy stopped by user")
```

--------------------------------------------------------------------------------
/src/mcp_log_analyzer/mcp_server/tools/network_test_tools.py:
--------------------------------------------------------------------------------

```python
"""
Network monitoring and testing MCP tools.
"""

import platform
import socket
import subprocess
from typing import Any, Dict, List

from mcp.server import FastMCP
from pydantic import BaseModel, Field


class NetworkPortTestRequest(BaseModel):
    """Request model for network port testing."""

    ports: List[int] = Field(..., description="List of ports to test")
    host: str = Field("localhost", description="Host to test (default: localhost)")
    timeout: int = Field(5, description="Connection timeout in seconds")


class NetworkConnectivityRequest(BaseModel):
    """Request model for network connectivity testing."""

    hosts: List[str] = Field(
        default_factory=lambda: ["8.8.8.8", "google.com", "github.com"],
        description="List of hosts to test connectivity to",
    )
    ping_count: int = Field(3, description="Number of ping packets to send")


class NetworkAnalysisRequest(BaseModel):
    """Request model for network analysis."""

    include_listening: bool = Field(
        True, description="Include listening ports analysis"
    )
    include_established: bool = Field(
        True, description="Include established connections"
    )
    analyze_traffic: bool = Field(False, description="Analyze network traffic patterns")


def register_network_test_tools(mcp: FastMCP):
    """Register all network testing tools with the MCP server."""

    @mcp.tool()
    async def test_network_tools_availability() -> Dict[str, Any]:
        """
        Test availability of network diagnostic tools.

        This tool checks if essential network tools are available
        on the system for network troubleshooting.
        """
        tools_to_test = {
            "ping": "Network connectivity testing",
            "netstat": "Network connections and statistics",
            "ss": "Modern replacement for netstat (Linux)",
            "nslookup": "DNS lookup utility",
            "curl": "HTTP client for testing web connectivity",
            "wget": "HTTP client alternative",
            "telnet": "TCP connection testing",
            "nc": "Netcat - networking utility",
        }

        available_tools = {}

        for tool, description in tools_to_test.items():
            try:
                # Test if tool exists and can run
                result = subprocess.run(
                    [tool, "--help"] if tool != "telnet" else [tool],
                    capture_output=True,
                    text=True,
                    timeout=5,
                )

                # Most tools return 0 for --help, but some may not
                available_tools[tool] = {
                    "available": True,
                    "description": description,
                    "test_successful": True,
                }

            except FileNotFoundError:
                available_tools[tool] = {
                    "available": False,
                    "description": description,
                    "error": "Command not found",
                }
            except subprocess.TimeoutExpired:
                available_tools[tool] = {
                    "available": True,
                    "description": description,
                    "test_successful": False,
                    "note": "Tool exists but test timed out",
                }
            except Exception as e:
                available_tools[tool] = {
                    "available": False,
                    "description": description,
                    "error": str(e),
                }

        # Count available tools
        available_count = sum(
            1 for tool in available_tools.values() if tool["available"]
        )
        total_count = len(tools_to_test)

        return {
            "platform": platform.system(),
            "tools_summary": {
                "total_tools": total_count,
                "available_tools": available_count,
                "availability_percentage": round(
                    (available_count / total_count) * 100, 1
                ),
            },
            "tools": available_tools,
        }

    @mcp.tool()
    async def test_port_connectivity(
        request: NetworkPortTestRequest,
    ) -> Dict[str, Any]:
        """
        Test connectivity to specific ports.

        This tool tests whether specific ports are open and accepting
        connections on the specified host.
        """
        try:
            test_results = {}

            for port in request.ports:
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.settimeout(request.timeout)

                    start_time = __import__("time").time()
                    result = sock.connect_ex((request.host, port))
                    end_time = __import__("time").time()

                    sock.close()

                    if result == 0:
                        test_results[port] = {
                            "status": "open",
                            "accessible": True,
                            "response_time_ms": round(
                                (end_time - start_time) * 1000, 2
                            ),
                        }
                    else:
                        test_results[port] = {
                            "status": "closed",
                            "accessible": False,
                            "error_code": result,
                        }

                except socket.timeout:
                    test_results[port] = {
                        "status": "timeout",
                        "accessible": False,
                        "error": f"Connection timed out after {request.timeout} seconds",
                    }
                except Exception as e:
                    test_results[port] = {
                        "status": "error",
                        "accessible": False,
                        "error": str(e),
                    }

            # Summary statistics
            open_ports = [p for p, r in test_results.items() if r["accessible"]]
            closed_ports = [p for p, r in test_results.items() if not r["accessible"]]

            return {
                "target_host": request.host,
                "timeout_seconds": request.timeout,
                "summary": {
                    "total_ports_tested": len(request.ports),
                    "open_ports": len(open_ports),
                    "closed_ports": len(closed_ports),
                    "open_port_list": open_ports,
                },
                "detailed_results": test_results,
            }

        except Exception as e:
            return {"error": f"Error testing port connectivity: {str(e)}"}

    @mcp.tool()
    async def test_network_connectivity(
        request: NetworkConnectivityRequest,
    ) -> Dict[str, Any]:
        """
        Test network connectivity to various hosts.

        This tool tests network connectivity using ping and provides
        latency and packet loss information.
        """
        try:
            connectivity_results = {}

            for host in request.hosts:
                try:
                    # Determine ping command based on platform
                    if platform.system() == "Windows":
                        ping_cmd = ["ping", "-n", str(request.ping_count), host]
                    else:
                        ping_cmd = ["ping", "-c", str(request.ping_count), host]

                    result = subprocess.run(
                        ping_cmd,
                        capture_output=True,
                        text=True,
                        timeout=request.ping_count * 5 + 10,  # Dynamic timeout
                    )

                    if result.returncode == 0:
                        # Parse ping output for statistics
                        output = result.stdout

                        # Extract basic connectivity info
                        connectivity_results[host] = {
                            "reachable": True,
                            "ping_output": output,
                            "command_used": " ".join(ping_cmd),
                        }

                        # Try to extract timing information (platform-specific parsing)
                        if platform.system() == "Windows":
                            # Windows ping output parsing
                            if "Average" in output:
                                lines = output.split("\n")
                                for line in lines:
                                    if "Average" in line:
                                        avg_time = line.split("=")[-1].strip()
                                        connectivity_results[host][
                                            "average_latency"
                                        ] = avg_time
                        else:
                            # Linux/Unix ping output parsing
                            if "min/avg/max" in output:
                                lines = output.split("\n")
                                for line in lines:
                                    if "min/avg/max" in line:
                                        times = line.split("=")[-1].strip()
                                        connectivity_results[host][
                                            "latency_stats"
                                        ] = times

                    else:
                        connectivity_results[host] = {
                            "reachable": False,
                            "error": result.stderr or "Ping failed",
                            "command_used": " ".join(ping_cmd),
                        }

                except subprocess.TimeoutExpired:
                    connectivity_results[host] = {
                        "reachable": False,
                        "error": "Ping command timed out",
                    }
                except Exception as e:
                    connectivity_results[host] = {
                        "reachable": False,
                        "error": str(e),
                    }

            # Summary statistics
            reachable_hosts = [
                h for h, r in connectivity_results.items() if r["reachable"]
            ]
            unreachable_hosts = [
                h for h, r in connectivity_results.items() if not r["reachable"]
            ]

            return {
                "test_parameters": {
                    "ping_count": request.ping_count,
                    "total_hosts": len(request.hosts),
                },
                "summary": {
                    "reachable_hosts": len(reachable_hosts),
                    "unreachable_hosts": len(unreachable_hosts),
                    "success_rate": round(
                        (len(reachable_hosts) / len(request.hosts)) * 100, 1
                    ),
                    "reachable_list": reachable_hosts,
                    "unreachable_list": unreachable_hosts,
                },
                "detailed_results": connectivity_results,
            }

        except Exception as e:
            return {"error": f"Error testing network connectivity: {str(e)}"}

    @mcp.tool()
    async def analyze_network_connections(
        request: NetworkAnalysisRequest,
    ) -> Dict[str, Any]:
        """
        Analyze current network connections and listening ports.

        This tool provides comprehensive analysis of network connections,
        listening ports, and traffic patterns for troubleshooting.
        """
        try:
            analysis_results = {}

            if request.include_listening:
                # Analyze listening ports
                try:
                    if platform.system() == "Windows":
                        cmd = ["netstat", "-an"]
                    else:
                        # Try ss first, fall back to netstat
                        try:
                            cmd_result = subprocess.run(
                                ["ss", "-tlnp"],
                                capture_output=True,
                                text=True,
                                timeout=10,
                            )
                            if cmd_result.returncode == 0:
                                cmd = ["ss", "-tlnp"]
                            else:
                                cmd = ["netstat", "-tlnp"]
                        except FileNotFoundError:
                            cmd = ["netstat", "-tlnp"]

                    result = subprocess.run(
                        cmd, capture_output=True, text=True, timeout=10
                    )

                    if result.returncode == 0:
                        lines = result.stdout.split("\n")
                        listening_ports = []

                        for line in lines:
                            if "LISTEN" in line or (
                                platform.system() != "Windows" and "tcp" in line.lower()
                            ):
                                parts = line.split()
                                if len(parts) >= 4:
                                    local_address = (
                                        parts[3]
                                        if platform.system() == "Windows"
                                        else parts[0]
                                    )
                                    if ":" in local_address:
                                        port = local_address.split(":")[-1]
                                        listening_ports.append(
                                            {
                                                "port": port,
                                                "address": local_address,
                                                "protocol": "TCP",
                                            }
                                        )

                        analysis_results["listening_ports"] = {
                            "total_count": len(listening_ports),
                            "ports": listening_ports[:20],  # Limit to first 20
                            "command_used": " ".join(cmd),
                        }
                    else:
                        analysis_results["listening_ports"] = {"error": result.stderr}

                except Exception as e:
                    analysis_results["listening_ports"] = {"error": str(e)}

            if request.include_established:
                # Analyze established connections
                try:
                    if platform.system() == "Windows":
                        cmd = ["netstat", "-an"]
                    else:
                        try:
                            cmd_result = subprocess.run(
                                ["ss", "-tnp", "state", "established"],
                                capture_output=True,
                                text=True,
                                timeout=10,
                            )
                            if cmd_result.returncode == 0:
                                cmd = ["ss", "-tnp", "state", "established"]
                            else:
                                cmd = ["netstat", "-tnp"]
                        except FileNotFoundError:
                            cmd = ["netstat", "-tnp"]

                    result = subprocess.run(
                        cmd, capture_output=True, text=True, timeout=10
                    )

                    if result.returncode == 0:
                        lines = result.stdout.split("\n")
                        established_connections = []

                        for line in lines:
                            if "ESTABLISHED" in line or "ESTAB" in line:
                                parts = line.split()
                                if len(parts) >= 5:
                                    local_addr = (
                                        parts[3]
                                        if platform.system() == "Windows"
                                        else parts[0]
                                    )
                                    foreign_addr = (
                                        parts[4]
                                        if platform.system() == "Windows"
                                        else parts[1]
                                    )
                                    established_connections.append(
                                        {
                                            "local_address": local_addr,
                                            "foreign_address": foreign_addr,
                                            "state": "ESTABLISHED",
                                        }
                                    )

                        analysis_results["established_connections"] = {
                            "total_count": len(established_connections),
                            "connections": established_connections[
                                :20
                            ],  # Limit to first 20
                            "command_used": " ".join(cmd),
                        }
                    else:
                        analysis_results["established_connections"] = {
                            "error": result.stderr
                        }

                except Exception as e:
                    analysis_results["established_connections"] = {"error": str(e)}

            # Network interface information
            try:
                if platform.system() == "Windows":
                    result = subprocess.run(
                        ["ipconfig", "/all"], capture_output=True, text=True, timeout=10
                    )
                else:
                    result = subprocess.run(
                        ["ip", "addr", "show"],
                        capture_output=True,
                        text=True,
                        timeout=10,
                    )

                if result.returncode == 0:
                    analysis_results["network_interfaces"] = {
                        "configuration": result.stdout[:2000],  # Limit output size
                        "command_used": (
                            "ipconfig /all"
                            if platform.system() == "Windows"
                            else "ip addr show"
                        ),
                    }
                else:
                    analysis_results["network_interfaces"] = {"error": result.stderr}

            except Exception as e:
                analysis_results["network_interfaces"] = {"error": str(e)}

            return {
                "analysis_scope": {
                    "include_listening": request.include_listening,
                    "include_established": request.include_established,
                    "analyze_traffic": request.analyze_traffic,
                },
                "platform": platform.system(),
                "results": analysis_results,
            }

        except Exception as e:
            return {"error": f"Error analyzing network connections: {str(e)}"}

    @mcp.tool()
    async def diagnose_network_issues() -> Dict[str, Any]:
        """
        Perform comprehensive network diagnostics.

        This tool runs multiple network tests to identify potential
        network connectivity or configuration issues.
        """
        try:
            diagnostic_results = {}

            # Test 1: Basic connectivity to well-known servers
            connectivity_hosts = ["8.8.8.8", "1.1.1.1", "google.com"]
            connectivity_results = {}

            for host in connectivity_hosts:
                try:
                    if platform.system() == "Windows":
                        ping_cmd = ["ping", "-n", "1", host]
                    else:
                        ping_cmd = ["ping", "-c", "1", host]

                    result = subprocess.run(
                        ping_cmd, capture_output=True, text=True, timeout=10
                    )
                    connectivity_results[host] = {
                        "reachable": result.returncode == 0,
                        "details": (
                            "Success" if result.returncode == 0 else result.stderr[:200]
                        ),
                    }
                except Exception as e:
                    connectivity_results[host] = {"reachable": False, "error": str(e)}

            diagnostic_results["connectivity_test"] = connectivity_results

            # Test 2: DNS Resolution
            dns_test_hosts = ["google.com", "github.com", "stackoverflow.com"]
            dns_results = {}

            for host in dns_test_hosts:
                try:
                    import socket

                    ip = socket.gethostbyname(host)
                    dns_results[host] = {"resolved": True, "ip_address": ip}
                except Exception as e:
                    dns_results[host] = {"resolved": False, "error": str(e)}

            diagnostic_results["dns_resolution_test"] = dns_results

            # Test 3: Common port connectivity
            important_ports = [80, 443, 53, 22]
            port_results = {}

            for port in important_ports:
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.settimeout(3)
                    result = sock.connect_ex(("google.com", port))
                    sock.close()

                    port_results[port] = {
                        "accessible": result == 0,
                        "service": {80: "HTTP", 443: "HTTPS", 53: "DNS", 22: "SSH"}.get(
                            port, "Unknown"
                        ),
                    }
                except Exception as e:
                    port_results[port] = {"accessible": False, "error": str(e)}

            diagnostic_results["port_connectivity_test"] = port_results

            # Test 4: Network interface status
            try:
                if platform.system() == "Windows":
                    result = subprocess.run(
                        ["ipconfig"], capture_output=True, text=True, timeout=5
                    )
                else:
                    result = subprocess.run(
                        ["ip", "link", "show"],
                        capture_output=True,
                        text=True,
                        timeout=5,
                    )

                diagnostic_results["interface_status"] = {
                    "command_successful": result.returncode == 0,
                    "output_preview": (
                        result.stdout[:500]
                        if result.returncode == 0
                        else result.stderr[:500]
                    ),
                }
            except Exception as e:
                diagnostic_results["interface_status"] = {"error": str(e)}

            # Overall assessment
            connectivity_success = sum(
                1 for r in connectivity_results.values() if r["reachable"]
            )
            dns_success = sum(1 for r in dns_results.values() if r["resolved"])
            port_success = sum(1 for r in port_results.values() if r["accessible"])

            total_tests = (
                len(connectivity_hosts) + len(dns_test_hosts) + len(important_ports)
            )
            successful_tests = connectivity_success + dns_success + port_success
            success_rate = (successful_tests / total_tests) * 100

            if success_rate >= 80:
                overall_status = "healthy"
            elif success_rate >= 60:
                overall_status = "fair"
            else:
                overall_status = "issues_detected"

            return {
                "overall_status": overall_status,
                "success_rate": round(success_rate, 1),
                "test_summary": {
                    "connectivity_tests": f"{connectivity_success}/{len(connectivity_hosts)} passed",
                    "dns_tests": f"{dns_success}/{len(dns_test_hosts)} passed",
                    "port_tests": f"{port_success}/{len(important_ports)} passed",
                },
                "detailed_results": diagnostic_results,
                "recommendations": _generate_network_recommendations(
                    diagnostic_results
                ),
            }

        except Exception as e:
            return {"error": f"Error performing network diagnostics: {str(e)}"}


def _generate_network_recommendations(results: Dict) -> List[str]:
    """Generate network troubleshooting recommendations based on test results."""
    recommendations = []

    # Check connectivity issues
    connectivity = results.get("connectivity_test", {})
    failed_connectivity = [
        host
        for host, result in connectivity.items()
        if not result.get("reachable", False)
    ]

    if failed_connectivity:
        recommendations.append(
            f"Check internet connectivity - failed to reach: {', '.join(failed_connectivity)}"
        )

    # Check DNS issues
    dns = results.get("dns_resolution_test", {})
    failed_dns = [
        host for host, result in dns.items() if not result.get("resolved", False)
    ]

    if failed_dns:
        recommendations.append(
            f"Check DNS configuration - failed to resolve: {', '.join(failed_dns)}"
        )

    # Check port issues
    ports = results.get("port_connectivity_test", {})
    failed_ports = [
        str(port)
        for port, result in ports.items()
        if not result.get("accessible", False)
    ]

    if failed_ports:
        recommendations.append(
            f"Check firewall settings - blocked ports: {', '.join(failed_ports)}"
        )

    if not recommendations:
        recommendations.append("Network appears to be functioning normally")

    return recommendations

```

--------------------------------------------------------------------------------
/src/mcp_log_analyzer/mcp_server/prompts/network_testing_prompt.py:
--------------------------------------------------------------------------------

```python
"""
Network testing and diagnostics prompts for the MCP Log Analyzer server.
"""

from typing import Optional
from mcp.server import FastMCP


def register_network_testing_prompts(mcp: FastMCP):
    """Register all network testing prompts."""

    @mcp.prompt(
        title="Network Tools Availability Check",
        description="Guide for testing availability of network diagnostic tools on the system"
    )
    async def network_tools_check() -> str:
        """
        Guide for checking which network diagnostic tools are available on the system.
        """
        return """
# 🔍 Network Tools Availability Check

## Tool: test_network_tools_availability

### Purpose
Tests availability of essential network diagnostic tools on your system.

### What It Checks
- **ping**: Network connectivity testing
- **netstat**: Network connections and statistics
- **ss**: Modern netstat replacement (Linux)
- **nslookup**: DNS lookup utility
- **curl**: HTTP client for web connectivity
- **wget**: Alternative HTTP client
- **telnet**: TCP connection testing
- **nc (netcat)**: Versatile networking utility

### Usage Example
```
Tool: test_network_tools_availability
```

### Interpreting Results
- **Available tools**: Can be used for diagnostics
- **Missing tools**: May need installation
- **Availability percentage**: Overall tool coverage
- **Platform notes**: OS-specific tool variations

### Next Steps
- Install missing critical tools if needed
- Use available tools for network diagnostics
- Consider platform alternatives for missing tools
"""

    @mcp.prompt(
        title="Port Connectivity Testing Guide",
        description="How to test connectivity to specific network ports"
    )
    async def port_connectivity_guide(
        service_type: Optional[str] = None
    ) -> str:
        """
        Guide for testing port connectivity.
        
        Args:
            service_type: Optional filter for specific service types (web, database, email, etc.)
        """
        
        base_guide = """
# 🔌 Port Connectivity Testing Guide

## Tool: test_port_connectivity

### Purpose
Tests TCP port accessibility and connection response times.

### Parameters
- **ports**: List of ports to test (e.g., [80, 443, 22, 3306])
- **host**: Target host (default: "localhost")
- **timeout**: Connection timeout in seconds (default: 5)

### Usage Examples
```
# Test web server ports
Tool: test_port_connectivity
Parameters: ports=[80, 443], host="example.com"

# Test database ports
Tool: test_port_connectivity
Parameters: ports=[3306, 5432, 1433], host="db-server"

# Test with custom timeout
Tool: test_port_connectivity
Parameters: ports=[8080], host="slow-server", timeout=10
```

### Capabilities
✅ Tests TCP port accessibility
✅ Measures connection response times
✅ Identifies open, closed, and filtered ports
✅ Provides summary statistics
✅ Handles connection timeouts gracefully
"""

        # Add service-specific guidance if requested
        service_guides = {
            "web": """
### Web Service Ports
- **80**: HTTP (unencrypted web traffic)
- **443**: HTTPS (encrypted web traffic)
- **8080**: Alternative HTTP port
- **8443**: Alternative HTTPS port
""",
            "database": """
### Database Service Ports
- **3306**: MySQL/MariaDB
- **5432**: PostgreSQL
- **1433**: Microsoft SQL Server
- **1521**: Oracle Database
- **27017**: MongoDB
- **6379**: Redis
""",
            "email": """
### Email Service Ports
- **25**: SMTP (sending mail)
- **110**: POP3 (retrieving mail)
- **143**: IMAP (accessing mail)
- **465**: SMTP over SSL
- **587**: SMTP submission
- **993**: IMAP over SSL
- **995**: POP3 over SSL
""",
            "remote": """
### Remote Access Ports
- **22**: SSH (Secure Shell)
- **23**: Telnet (insecure)
- **3389**: RDP (Remote Desktop)
- **5900**: VNC (Virtual Network Computing)
"""
        }

        if service_type and service_type.lower() in service_guides:
            base_guide += service_guides[service_type.lower()]

        base_guide += """
### Interpreting Results
- **Open**: Port is accessible and service is listening
- **Closed**: Port is reachable but no service listening
- **Filtered**: Firewall or security device blocking access
- **Timeout**: No response within timeout period

### Troubleshooting Failed Connections
1. Verify service is running on target host
2. Check firewall rules on both client and server
3. Ensure correct hostname/IP address
4. Test from different network locations
5. Verify network routing between hosts
"""
        
        return base_guide

    @mcp.prompt(
        title="Network Connectivity Test Guide",
        description="Guide for testing basic network connectivity using ping"
    )
    async def connectivity_test_guide() -> str:
        """
        Guide for testing network connectivity to various hosts.
        """
        return """
# 📡 Network Connectivity Test Guide

## Tool: test_network_connectivity

### Purpose
Tests network reachability using ping to measure latency and packet loss.

### Parameters
- **hosts**: List of hosts to test (default: ["8.8.8.8", "google.com", "github.com"])
- **ping_count**: Number of ping packets (default: 3)

### Usage Examples
```
# Test default hosts
Tool: test_network_connectivity

# Test specific hosts
Tool: test_network_connectivity
Parameters: hosts=["192.168.1.1", "10.0.0.1", "gateway.local"]

# Extended test with more packets
Tool: test_network_connectivity
Parameters: hosts=["remote-server.com"], ping_count=10
```

### Capabilities
✅ Tests network reachability using ping
✅ Measures latency and packet loss
✅ Cross-platform ping command support
✅ Parses ping output for statistics
✅ Provides connectivity success rates

### Interpreting Results
- **Success Rate**: Percentage of successful pings
  - 100%: Excellent connectivity
  - 80-99%: Good with minor packet loss
  - 50-79%: Fair, noticeable issues
  - <50%: Poor connectivity

- **Latency (Round-Trip Time)**:
  - <10ms: Excellent (local network)
  - 10-50ms: Good (same region)
  - 50-150ms: Fair (cross-country)
  - >150ms: High latency (international/satellite)

### Common Test Scenarios
1. **Local Network Test**: Test router/gateway
2. **Internet Test**: Test public DNS servers (8.8.8.8, 1.1.1.1)
3. **Service Test**: Test specific service endpoints
4. **ISP Test**: Test ISP's DNS servers
"""

    @mcp.prompt(
        title="Network Connection Analysis Guide",
        description="How to analyze current network connections and listening ports"
    )
    async def connection_analysis_guide() -> str:
        """
        Guide for analyzing network connections and listening services.
        """
        return """
# 🔍 Network Connection Analysis Guide

## Tool: analyze_network_connections

### Purpose
Analyzes current network connections, listening ports, and network interfaces.

### Parameters
- **include_listening**: Include listening ports analysis (default: true)
- **include_established**: Include established connections (default: true)  
- **analyze_traffic**: Analyze traffic patterns (default: false)

### Usage Examples
```
# Full analysis
Tool: analyze_network_connections

# Only listening ports
Tool: analyze_network_connections
Parameters: include_established=false

# Only active connections
Tool: analyze_network_connections
Parameters: include_listening=false
```

### What It Analyzes
1. **Listening Ports**
   - Services accepting connections
   - Port numbers and protocols
   - Potential security exposure

2. **Established Connections**
   - Active network sessions
   - Remote endpoints
   - Connection states

3. **Network Interfaces**
   - Interface configuration
   - IP addresses and DNS settings
   - Interface status

### Security Considerations
- Review listening ports for unnecessary services
- Check established connections for suspicious endpoints
- Verify services bind to appropriate interfaces
- Monitor for unusual connection patterns

### Typical Port Ranges
- **Well-known**: 0-1023 (system services)
- **Registered**: 1024-49151 (user applications)
- **Dynamic**: 49152-65535 (temporary connections)
"""

    @mcp.prompt(
        title="Quick Network Diagnostics",
        description="Fast network health check procedure"
    )
    async def quick_diagnostics() -> str:
        """
        Quick network diagnostics procedure for rapid assessment.
        """
        return """
# ⚡ Quick Network Diagnostics

## Tool: diagnose_network_issues

### Purpose
Performs comprehensive network diagnostics in a single operation.

### What It Tests
✅ Basic connectivity to well-known servers
✅ DNS resolution functionality
✅ Common port accessibility
✅ Network interface status
✅ Overall network health assessment

### Usage
```
Tool: diagnose_network_issues
```

### Results Include
- **Connectivity Status**: Internet and local network
- **DNS Health**: Resolution success/failure
- **Service Availability**: Common ports status
- **Network Interfaces**: Configuration and status
- **Health Score**: Overall network assessment
- **Recommendations**: Suggested fixes for issues

### When to Use
- Initial network troubleshooting
- Quick health check before detailed analysis
- Periodic network monitoring
- After network configuration changes
- When users report connectivity issues

### Follow-up Actions
Based on results, you may need to:
1. Run specific port tests
2. Check DNS configuration
3. Analyze network connections
4. Test specific services
5. Review firewall rules
"""

    @mcp.prompt(
        title="Network Port Reference",
        description="Common network ports and their services"
    )
    async def port_reference(
        category: Optional[str] = None
    ) -> str:
        """
        Reference guide for common network ports.
        
        Args:
            category: Optional category filter (web, database, email, remote, etc.)
        """
        
        all_ports = {
            "web": """
## 🌐 Web Service Ports
- **80**: HTTP - Standard web traffic
- **443**: HTTPS - Secure web traffic
- **8080**: HTTP alternate - Common development port
- **8443**: HTTPS alternate - Secure development port
- **3000**: Node.js development server
- **4200**: Angular development server
- **8000**: Python HTTP server
""",
            "database": """
## 💾 Database Ports
- **3306**: MySQL/MariaDB
- **5432**: PostgreSQL
- **1433**: Microsoft SQL Server
- **1521**: Oracle Database
- **27017**: MongoDB
- **6379**: Redis
- **5984**: CouchDB
- **7474**: Neo4j
- **9042**: Cassandra
""",
            "email": """
## 📧 Email Service Ports
- **25**: SMTP - Mail sending
- **110**: POP3 - Mail retrieval
- **143**: IMAP - Mail access
- **465**: SMTP SSL - Secure mail sending
- **587**: SMTP submission
- **993**: IMAP SSL - Secure mail access
- **995**: POP3 SSL - Secure mail retrieval
""",
            "remote": """
## 🖥️ Remote Access Ports
- **22**: SSH - Secure Shell
- **23**: Telnet - Unsecure remote access
- **3389**: RDP - Windows Remote Desktop
- **5900**: VNC - Virtual Network Computing
- **5985**: WinRM HTTP
- **5986**: WinRM HTTPS
""",
            "messaging": """
## 💬 Messaging & Queue Ports
- **5672**: RabbitMQ
- **61613**: ActiveMQ STOMP
- **9092**: Apache Kafka
- **4222**: NATS
- **6667**: IRC
- **5222**: XMPP client
""",
            "monitoring": """
## 📊 Monitoring & Management Ports
- **161**: SNMP - Network monitoring
- **514**: Syslog
- **2003**: Graphite
- **8086**: InfluxDB
- **9090**: Prometheus
- **3000**: Grafana
- **9200**: Elasticsearch
- **5601**: Kibana
"""
        }
        
        result = "# 🔌 Network Port Reference\n\n"
        
        if category and category.lower() in all_ports:
            result += all_ports[category.lower()]
        else:
            result += "## Common Network Ports by Category\n\n"
            for cat_name, content in all_ports.items():
                result += content + "\n"
        
        result += """
## Port Number Ranges
- **0-1023**: Well-known ports (require root/admin)
- **1024-49151**: Registered ports (user applications)
- **49152-65535**: Dynamic/private ports (temporary)

## Security Notes
- Only open ports that are necessary
- Use encrypted alternatives when available
- Monitor listening ports regularly
- Implement firewall rules for protection
"""
        
        return result

    @mcp.prompt(
        title="Network Issue: Cannot Connect to Internet",
        description="Troubleshooting guide for complete internet connectivity loss"
    )
    async def troubleshoot_no_internet() -> str:
        """
        Step-by-step guide for troubleshooting internet connectivity issues.
        """
        return """
# 🚫 Troubleshooting: Cannot Connect to Internet

## Symptoms
- No websites load
- Ping to external IPs fails
- DNS resolution fails
- Local network may still work

## Diagnostic Steps

### Step 1: Test Network Tools
```
Tool: test_network_tools_availability
```
Verify diagnostic tools are available.

### Step 2: Test Local Network
```
Tool: test_network_connectivity
Parameters: hosts=["192.168.1.1", "10.0.0.1"]
```
Check if local network/router is reachable.

### Step 3: Test DNS
```
Tool: test_port_connectivity
Parameters: ports=[53], host="8.8.8.8"
```
Check if DNS servers are reachable.

### Step 4: Test External Connectivity
```
Tool: test_network_connectivity  
Parameters: hosts=["8.8.8.8", "1.1.1.1"]
```
Test connectivity to public DNS servers.

### Step 5: Comprehensive Check
```
Tool: diagnose_network_issues
```
Run full network diagnostics.

## Solutions by Root Cause

### Router/Gateway Issue
- Restart router/modem
- Check cable connections
- Verify router lights/status
- Access router admin panel

### ISP Outage
- Check ISP status page
- Contact ISP support
- Wait for service restoration
- Consider backup connection

### DNS Issues
- Change DNS servers to 8.8.8.8, 1.1.1.1
- Flush DNS cache
- Check DNS configuration
- Test with IP addresses directly

### Network Interface
- Check interface status
- Restart network service
- Update network drivers
- Reset network settings
"""

    @mcp.prompt(
        title="Network Issue: Slow Performance",
        description="Troubleshooting guide for slow network performance"
    )
    async def troubleshoot_slow_network() -> str:
        """
        Guide for diagnosing and fixing slow network performance.
        """
        return """
# 🐌 Troubleshooting: Slow Network Performance

## Symptoms
- Websites load very slowly
- High latency in ping tests
- Downloads/uploads are slow
- Intermittent timeouts

## Diagnostic Steps

### Step 1: Measure Baseline Performance
```
Tool: test_network_connectivity
Parameters: hosts=["8.8.8.8", "google.com", "cloudflare.com"], ping_count=10
```
Measure latency to various destinations.

### Step 2: Test Port Response Times
```
Tool: test_port_connectivity
Parameters: ports=[80, 443], host="google.com", timeout=10
```
Measure connection establishment times.

### Step 3: Check Network Utilization
```
Tool: analyze_network_connections
Parameters: include_established=true
```
Look for high connection counts or unusual traffic.

### Step 4: Test Different Destinations
```
Tool: test_network_connectivity
Parameters: hosts=["local-server", "regional-server", "global-server"]
```
Identify if slowness is location-specific.

## Performance Benchmarks

### Latency Guidelines
- **Excellent**: <50ms
- **Good**: 50-100ms  
- **Fair**: 100-200ms
- **Poor**: >200ms

### Connection Times
- **Fast**: <1s
- **Normal**: 1-3s
- **Slow**: 3-10s
- **Very Slow**: >10s

## Common Causes & Solutions

### High Latency
- Check for network congestion
- Review QoS settings
- Test wired vs wireless
- Check for interference

### Bandwidth Issues
- Run speed test
- Check for bandwidth limits
- Monitor usage patterns
- Upgrade internet plan

### Local Congestion
- Limit bandwidth-heavy apps
- Schedule large downloads
- Check for malware
- Review connected devices

### DNS Delays
- Switch to faster DNS (1.1.1.1, 8.8.8.8)
- Enable DNS caching
- Check DNS server load
- Use local DNS server
"""

    @mcp.prompt(
        title="Network Issue: Service Unreachable",
        description="Troubleshooting guide for when specific services cannot be reached"
    )
    async def troubleshoot_service_unreachable() -> str:
        """
        Guide for troubleshooting unreachable network services.
        """
        return """
# 🔌 Troubleshooting: Service Unreachable

## Symptoms
- Specific website/service won't load
- Other internet services work fine
- "Connection refused" or timeout errors
- Service works for others

## Diagnostic Steps

### Step 1: Test Service Ports
```
Tool: test_port_connectivity
Parameters: ports=[80, 443], host="target-server.com"
```
Check if service ports are accessible.

### Step 2: Test Basic Connectivity
```
Tool: test_network_connectivity
Parameters: hosts=["target-server.com"]
```
Verify basic network path to server.

### Step 3: Check DNS Resolution
```
Tool: diagnose_network_issues
```
Verify DNS is resolving correctly.

### Step 4: Test From Different Network
If possible, test from:
- Different device
- Mobile hotspot
- VPN connection
- Different location

### Step 5: Check Local Services
```
Tool: analyze_network_connections
Parameters: include_listening=true
```
For local services, verify they're listening.

## Root Cause Analysis

### Port Blocked
- **Local Firewall**: Check outbound rules
- **Corporate Firewall**: Contact IT
- **ISP Blocking**: Try different port/VPN
- **Server Firewall**: Contact service admin

### Service Down
- Check service status page
- Try alternative endpoints
- Contact service support
- Wait for restoration

### DNS Issues
- Try direct IP if known
- Use different DNS server
- Clear DNS cache
- Check hosts file

### Routing Issue
- Traceroute to identify where it fails
- Try VPN to bypass routing
- Contact ISP if persistent
- Wait for route fix

## Service-Specific Checks

### Web Services (80/443)
- Test HTTP and HTTPS separately
- Check for SSL/TLS issues
- Try different browsers
- Clear browser cache

### Database Services
- Verify connection string
- Check authentication
- Test from DB client
- Review access permissions

### Email Services
- Test each protocol separately
- Verify authentication
- Check SSL/TLS settings
- Review port numbers
"""

    @mcp.prompt(
        title="Network Security Audit",
        description="Guide for performing basic network security assessment"
    )
    async def network_security_audit() -> str:
        """
        Guide for conducting a basic network security audit.
        """
        return """
# 🔒 Network Security Audit Guide

## Purpose
Identify potential security risks in network configuration.

## Audit Steps

### Step 1: Audit Listening Ports
```
Tool: analyze_network_connections
Parameters: include_listening=true, include_established=false
```
Identify all services accepting connections.

### Step 2: Check Established Connections
```
Tool: analyze_network_connections
Parameters: include_listening=false, include_established=true
```
Review current connections for suspicious activity.

### Step 3: Test External Exposure
```
Tool: test_port_connectivity
Parameters: ports=[22, 3389, 3306, 5432], host="your-external-ip"
```
Check if sensitive services are exposed.

### Step 4: Verify Critical Services
```
Tool: test_port_connectivity
Parameters: ports=[80, 443, 22], host="localhost"
```
Ensure legitimate services are accessible.

## Security Checklist

### Listening Ports Review
- [ ] Document all listening services
- [ ] Identify unnecessary open ports
- [ ] Verify services bind to correct interfaces
- [ ] Check for default/development ports

### Connection Analysis
- [ ] Review established connections
- [ ] Look for unusual destinations
- [ ] Check for high connection counts
- [ ] Monitor for persistent connections

### Common Security Issues

#### Unnecessary Services
- Development servers on production
- Default services never disabled
- Debug ports left open
- Unused database ports

#### Weak Bindings
- Services bound to 0.0.0.0 unnecessarily
- No firewall rules in place
- Default configurations unchanged
- Missing network segmentation

#### Suspicious Activity
- Connections to unknown IPs
- Unusual port numbers
- High-numbered listening ports
- Services running as root/admin

## Remediation Steps

1. **Close Unnecessary Ports**
   - Stop unused services
   - Update firewall rules
   - Document required ports

2. **Restrict Access**
   - Bind to specific interfaces
   - Implement access controls
   - Use VPN for sensitive services

3. **Monitor Regularly**
   - Schedule periodic audits
   - Set up port monitoring
   - Alert on new services
   - Log connection attempts

4. **Follow Best Practices**
   - Change default ports
   - Use encrypted protocols
   - Implement least privilege
   - Keep services updated
"""

    @mcp.prompt(
        title="Emergency Network Diagnostics",
        description="Quick triage protocol for network emergencies"
    )
    async def emergency_diagnostics() -> str:
        """
        Emergency response guide for critical network issues.
        """
        return """
# 🚨 Emergency Network Diagnostics Protocol

## When to Use
- Complete network outage
- Critical service failure
- Performance degradation affecting business
- Security incident response

## Phase 1: Immediate Assessment (< 2 minutes)

### Quick Health Check
```
Tool: diagnose_network_issues
```
Get overall network status immediately.

### Basic Connectivity Test
```
Tool: test_network_connectivity
Parameters: hosts=["8.8.8.8", "google.com"]
```
Determine scope of outage.

### Decision Point
- **Local issue**: Proceed to Phase 2A
- **Widespread issue**: Proceed to Phase 2B
- **Service-specific**: Proceed to Phase 2C

## Phase 2A: Local Issue Investigation (2-5 minutes)

### Check Local Services
```
Tool: analyze_network_connections
Parameters: include_listening=true
```
Verify critical services are running.

### Test Critical Ports
```
Tool: test_port_connectivity
Parameters: ports=[critical-service-ports]
```
Ensure services are accessible.

## Phase 2B: Widespread Outage (2-5 minutes)

### Test Network Path
```
Tool: test_network_connectivity
Parameters: hosts=["gateway", "isp-dns", "8.8.8.8"]
```
Identify where connectivity fails.

### Check Interfaces
```
Tool: analyze_network_connections
```
Verify network interfaces are up.

## Phase 2C: Service-Specific Issue (2-5 minutes)

### Test Service Connectivity
```
Tool: test_port_connectivity
Parameters: ports=[service-ports], host="service-host"
```
Check specific service accessibility.

### Verify DNS
```
Tool: test_network_connectivity
Parameters: hosts=["service-hostname"]
```
Ensure name resolution works.

## Phase 3: Detailed Analysis (5+ minutes)
Based on Phase 2 results:
- Run targeted diagnostics
- Check logs and events
- Test from multiple locations
- Verify recent changes

## Emergency Response Actions

### Communication
1. Alert stakeholders immediately
2. Document timeline and impacts
3. Provide regular updates
4. Track resolution progress

### Temporary Measures
- Implement workarounds
- Redirect traffic if possible
- Enable backup systems
- Communicate alternatives

### Root Cause Analysis
- Document all findings
- Identify contributing factors
- Plan preventive measures
- Schedule post-mortem

## Critical Service Priority
1. **Network Core**: Routers, switches
2. **Security**: Firewalls, IDS/IPS
3. **External Access**: Internet gateway
4. **DNS Services**: Name resolution
5. **Business Critical**: Specific to organization

## Escalation Triggers
- No progress after 15 minutes
- Multiple services affected
- Security incident suspected
- Hardware failure indicated
- Business impact increasing

Remember: In emergencies, speed is critical but accuracy prevents making things worse. Follow the protocol, document everything, and escalate when needed.
"""

    @mcp.prompt(
        title="Network Performance Baseline",
        description="Guide for establishing network performance baselines"
    )
    async def performance_baseline_guide() -> str:
        """
        Guide for establishing and monitoring network performance baselines.
        """
        return """
# 📊 Network Performance Baseline Guide

## Why Establish Baselines?
- Detect performance degradation early
- Identify abnormal behavior
- Plan capacity upgrades
- Troubleshoot more effectively

## Baseline Measurements

### 1. Latency Baseline
```
Tool: test_network_connectivity
Parameters: hosts=["local-router", "isp-gateway", "major-sites"], ping_count=20
```

Measure and record:
- Local network latency (should be <5ms)
- ISP gateway latency (typically <20ms)
- Internet latency (varies by distance)
- Peak vs off-peak differences

### 2. Service Response Times
```
Tool: test_port_connectivity
Parameters: ports=[your-service-ports], timeout=10
```

Track:
- Connection establishment time
- Service response time
- Timeout frequency
- Time-of-day variations

### 3. Connection Patterns
```
Tool: analyze_network_connections
```

Monitor:
- Typical connection count
- Common connection destinations
- Port usage patterns
- Protocol distribution

## Baseline Schedule

### Daily Measurements
- Morning: Before business hours
- Midday: Peak usage time
- Evening: After business hours
- Night: Minimum usage baseline

### Weekly Analysis
- Compare daily patterns
- Identify usage trends
- Note any anomalies
- Update documentation

### Monthly Review
- Calculate averages
- Identify long-term trends
- Plan for capacity
- Update thresholds

## Key Metrics to Track

### Latency Metrics
- Minimum RTT
- Average RTT  
- Maximum RTT
- Standard deviation
- Packet loss percentage

### Bandwidth Metrics
- Peak utilization
- Average utilization
- Time above 80%
- Growth rate

### Connection Metrics
- Peak connection count
- Average connections
- New connections/second
- Connection duration

## Setting Alert Thresholds

### Latency Alerts
- Warning: 150% of baseline
- Critical: 200% of baseline
- Sustained: Over 5 minutes

### Connection Alerts
- Warning: 150% of baseline
- Critical: 200% of baseline
- New services detected

### Availability Alerts
- Service down: Immediate
- Intermittent: >3 failures/hour
- Degraded: Response >2x baseline

## Using Baselines for Troubleshooting

1. **Compare Current to Baseline**
   - Is latency higher than normal?
   - Are connection patterns different?
   - Are new services appearing?

2. **Identify Deviations**
   - When did change occur?
   - What else changed then?
   - Is it following a pattern?

3. **Correlate with Events**
   - System updates
   - Configuration changes
   - Usage patterns
   - External factors

## Documentation Template

### Network Performance Baseline Record
```
Date: [YYYY-MM-DD]
Time: [HH:MM]
Measured By: [Name]

Latency Baseline:
- Local Network: [X]ms
- ISP Gateway: [X]ms  
- Internet (avg): [X]ms

Connection Baseline:
- Active Connections: [X]
- Listening Ports: [List]
- Peak Usage Time: [Time]

Notes:
- Special conditions
- Anomalies observed
- Changes from previous
```

## Best Practices

1. **Consistency**
   - Same measurement points
   - Same time periods
   - Same tools/methods
   - Regular schedule

2. **Documentation**
   - Record all measurements
   - Note environmental factors
   - Track configuration changes
   - Maintain history

3. **Review & Adjust**
   - Update baselines quarterly
   - Account for growth
   - Adjust thresholds
   - Plan for capacity

A good baseline is your most valuable troubleshooting tool!
"""
```
Page 3/3FirstPrevNextLast