# Directory Structure
```
├── .dockerignore
├── .env.example
├── .gitignore
├── .python-version
├── analyzer
│   ├── __init__.py
│   ├── analyzer.py
│   ├── bottleneck
│   │   ├── __init__.py
│   │   └── analyzer.py
│   ├── insights
│   │   ├── __init__.py
│   │   └── generator.py
│   ├── mcp
│   │   └── __init__.py
│   ├── metrics
│   │   ├── __init__.py
│   │   └── calculator.py
│   ├── models.py
│   ├── parser
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── csv_parser.py
│   │   └── xml_parser.py
│   └── visualization
│       ├── __init__.py
│       └── engine.py
├── Dockerfile
├── images
│   ├── Anthropic-MCP.png
│   ├── Cursor.png
│   └── Windsurf.png
├── jmeter_report.html
├── jmeter_server.py
├── main.py
├── mcp_config.json
├── pyproject.toml
├── README.md
├── requirements_windsurf_reader.txt
├── requirements.txt
├── sample_test.jmx
├── smithery.yaml
├── tests
│   ├── __init__.py
│   ├── test_analyzer_models.py
│   ├── test_analyzer_parser.py
│   ├── test_bottleneck_analyzer.py
│   ├── test_csv_parser.py
│   ├── test_insights_generator.py
│   ├── test_jmeter_server.py
│   ├── test_metrics_calculator.py
│   ├── test_visualization_engine.py
│   └── test_xml_parser.py
├── windsurf_db_reader_alternative.py
└── windsurf_db_reader.py
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
.git
.gitignore
__pycache__
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.env
*.log
.DS_Store
Dockerfile
.dockerignore
README.md 
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# JMeter Configuration
JMETER_HOME=/path/to/apache-jmeter-5.6.3
JMETER_BIN=${JMETER_HOME}/bin/jmeter
# Optional: JMeter Java options
JMETER_JAVA_OPTS="-Xms1g -Xmx2g"
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock
# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
uv.lock
# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
.DS_Store
.env
*.jtl
*.csv
.kiro/
*.zip
*.chat
.kiro/debug/chats/1.chat
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# 🚀 JMeter MCP Server
This is a Model Context Protocol (MCP) server that allows executing JMeter tests through MCP-compatible clients and analyzing test results.
> [!IMPORTANT]
> 📢 Looking for an AI Assistant inside JMeter? 🚀
> Check out [Feather Wand](https://jmeter.ai)



## 📋 Features
### JMeter Execution
- 📊 Execute JMeter tests in non-GUI mode
- 🖥️ Launch JMeter in GUI mode
- 📝 Capture and return execution output
- 📊 Generate JMeter report dashboard
### Test Results Analysis
- 📈 Parse and analyze JMeter test results (JTL files)
- 📊 Calculate comprehensive performance metrics
- 🔍 Identify performance bottlenecks automatically
- 💡 Generate actionable insights and recommendations
- 📊 Create visualizations of test results
- 📑 Generate HTML reports with analysis results
## 🛠️ Installation
### Local Installation
1. Install [`uv`](https://github.com/astral-sh/uv):
2. Ensure JMeter is installed on your system and accessible via the command line.
⚠️ **Important**: Make sure JMeter is executable. You can do this by running:
```bash
chmod +x /path/to/jmeter/bin/jmeter
```
3. Install required Python dependencies:
```bash
pip install numpy matplotlib
```
4. Configure the `.env` file, refer to the `.env.example` file for details.
```bash
# JMeter Configuration
JMETER_HOME=/path/to/apache-jmeter-5.6.3
JMETER_BIN=${JMETER_HOME}/bin/jmeter
# Optional: JMeter Java options
JMETER_JAVA_OPTS="-Xms1g -Xmx2g"
```
### 💻 MCP Usage
1. Connect to the server using an MCP-compatible client (e.g., Claude Desktop, Cursor, Windsurf)
2. Send a prompt to the server:
```
Run JMeter test /path/to/test.jmx
```
3. MCP compatible client will use the available tools:
#### JMeter Execution Tools
- 🖥️ `execute_jmeter_test`: Launches JMeter in GUI mode, but doesn't execute test as per the JMeter design
- 🚀 `execute_jmeter_test_non_gui`: Execute a JMeter test in non-GUI mode (default mode for better performance)
#### Test Results Analysis Tools
- 📊 `analyze_jmeter_results`: Analyze JMeter test results and provide a summary of key metrics and insights
- 🔍 `identify_performance_bottlenecks`: Identify performance bottlenecks in JMeter test results
- 💡 `get_performance_insights`: Get insights and recommendations for improving performance
- 📈 `generate_visualization`: Generate visualizations of JMeter test results
## 🏗️ MCP Configuration
Add the following configuration to your MCP client config:
```json
{
    "mcpServers": {
      "jmeter": {
        "command": "/path/to/uv",
        "args": [
          "--directory",
          "/path/to/jmeter-mcp-server",
          "run",
          "jmeter_server.py"
        ]
      }
    }
}
```
## ✨ Use Cases
### Test Execution
- Run JMeter tests in non-GUI mode for better performance
- Launch JMeter in GUI mode for test development
- Generate JMeter report dashboards
### Test Results Analysis
- Analyze JTL files to understand performance characteristics
- Identify performance bottlenecks and their severity
- Get actionable recommendations for performance improvements
- Generate visualizations for better understanding of results
- Create comprehensive HTML reports for sharing with stakeholders
## 🛑 Error Handling
The server will:
- Validate that the test file exists
- Check that the file has a .jmx extension
- Validate that JTL files exist and have valid formats
- Capture and return any execution or analysis errors
## 📊 Test Results Analyzer
The Test Results Analyzer is a powerful feature that helps you understand your JMeter test results better. It consists of several components:
### Parser Module
- Supports both XML and CSV JTL formats
- Efficiently processes large files with streaming parsers
- Validates file formats and handles errors gracefully
### Metrics Calculator
- Calculates overall performance metrics (average, median, percentiles)
- Provides endpoint-specific metrics for detailed analysis
- Generates time series metrics to track performance over time
- Compares metrics with benchmarks for context
### Bottleneck Analyzer
- Identifies slow endpoints based on response times
- Detects error-prone endpoints with high error rates
- Finds response time anomalies and outliers
- Analyzes the impact of concurrency on performance
### Insights Generator
- Provides specific recommendations for addressing bottlenecks
- Analyzes error patterns and suggests solutions
- Generates insights on scaling behavior and capacity limits
- Prioritizes recommendations based on potential impact
### Visualization Engine
- Creates time series graphs showing performance over time
- Generates distribution graphs for response time analysis
- Produces endpoint comparison charts for identifying issues
- Creates comprehensive HTML reports with all analysis results
## 📝 Example Usage
```
# Run a JMeter test and generate a results file
Run JMeter test sample_test.jmx in non-GUI mode and save results to results.jtl
# Analyze the results
Analyze the JMeter test results in results.jtl and provide detailed insights
# Identify bottlenecks
What are the performance bottlenecks in the results.jtl file?
# Get recommendations
What recommendations do you have for improving performance based on results.jtl?
# Generate visualizations
Create a time series graph of response times from results.jtl
```
```
--------------------------------------------------------------------------------
/requirements_windsurf_reader.txt:
--------------------------------------------------------------------------------
```
plyvel>=1.3.0
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp[cli]<1.6.0
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Test package initializer for jmeter_server tests.
"""
```
--------------------------------------------------------------------------------
/analyzer/mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""
MCP interface for the JMeter Test Results Analyzer.
This module provides MCP tools for analyzing JMeter test results.
"""
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "jmeter-mcp-server"
version = "0.1.0"
description = "JMeter MCP Server"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "httpx>=0.28.1",
    "mcp[cli]>=1.6.0",
]
```
--------------------------------------------------------------------------------
/mcp_config.json:
--------------------------------------------------------------------------------
```json
{
    "mcpServers": {
      "jmeter": {
        "command": "/path/to/uv",
        "args": [
          "--directory",
          "/path/to/jmeter-mcp-server",
          "run",
          "jmeter_server.py"
        ]
      }
    }
}
```
--------------------------------------------------------------------------------
/analyzer/metrics/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Metrics module for JMeter test results.
This module provides functionality for calculating performance metrics
from JMeter test results.
"""
from analyzer.metrics.calculator import MetricsCalculator
__all__ = ['MetricsCalculator']
```
--------------------------------------------------------------------------------
/analyzer/__init__.py:
--------------------------------------------------------------------------------
```python
"""
JMeter Test Results Analyzer module.
This module provides functionality for analyzing JMeter test results,
calculating performance metrics, identifying bottlenecks, and generating
insights and recommendations.
"""
__version__ = '0.1.0'
```
--------------------------------------------------------------------------------
/analyzer/bottleneck/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Bottleneck analyzer module for JMeter test results.
This module provides functionality for identifying performance bottlenecks
in JMeter test results.
"""
from analyzer.bottleneck.analyzer import BottleneckAnalyzer
__all__ = ['BottleneckAnalyzer']
```
--------------------------------------------------------------------------------
/analyzer/visualization/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Visualization module for JMeter test results.
This module provides functionality for creating visual representations
of JMeter test results analysis.
"""
from analyzer.visualization.engine import VisualizationEngine
__all__ = ['VisualizationEngine']
```
--------------------------------------------------------------------------------
/analyzer/insights/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Insights module for JMeter test results.
This module provides functionality for generating insights and recommendations
based on JMeter test results analysis.
"""
from analyzer.insights.generator import InsightsGenerator
__all__ = ['InsightsGenerator']
```
--------------------------------------------------------------------------------
/analyzer/parser/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Parser module for JMeter test results.
This module provides functionality for parsing JMeter test results
from JTL files in both XML and CSV formats.
"""
from analyzer.parser.base import JTLParser
from analyzer.parser.xml_parser import XMLJTLParser
from analyzer.parser.csv_parser import CSVJTLParser
__all__ = ['JTLParser', 'XMLJTLParser', 'CSVJTLParser']
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
import os
# Load environment variables
load_dotenv()
# Initialize MCP server
mcp = FastMCP("jmeter")
def main():
    print("Starting JMeter MCP server...")
    print(os.getenv('JMETER_HOME'))
    print(os.getenv('JMETER_BIN'))
    print(os.getenv('JMETER_JAVA_OPTS'))
    mcp.run(transport='stdio')
if __name__ == "__main__":
    main()
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml
startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    properties: {}
  commandFunction:
    # A function that produces the CLI command to start the MCP on stdio.
    |-
    (config) => ({command: 'python', args: [
        "jmeter_server.py"
      ], env: {}})
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Use Python base image
FROM python:3.10-slim
# Install OpenJDK and build dependencies
RUN apt-get update && \
    apt-get install -y default-jdk wget && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*
# Install JMeter
ENV JMETER_VERSION="5.6.3"
ENV JMETER_HOME="/opt/apache-jmeter-${JMETER_VERSION}"
ENV PATH="$JMETER_HOME/bin:$PATH"
RUN wget https://archive.apache.org/dist/jmeter/binaries/apache-jmeter-${JMETER_VERSION}.tgz && \
    tar -xzf apache-jmeter-${JMETER_VERSION}.tgz -C /opt && \
    rm apache-jmeter-${JMETER_VERSION}.tgz
# Set working directory
WORKDIR /app
# Copy application files
COPY . .
# Install Python dependencies
RUN pip install --upgrade pip && \
    pip install "mcp[cli]<1.6.0" && \
    pip install --no-cache-dir -r requirements.txt
# Expose port (adjust if your server uses a different port)
EXPOSE 8000
# Run the server
CMD ["python", "jmeter_server.py"] 
```
--------------------------------------------------------------------------------
/analyzer/models.py:
--------------------------------------------------------------------------------
```python
"""
Data models for the JMeter Test Results Analyzer.
This module defines the core data structures used throughout the analyzer,
including TestResults, Sample, and various metrics classes.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
@dataclass
class Sample:
    """Represents a single sample/request in a JMeter test."""
    
    timestamp: datetime
    label: str
    response_time: int  # in milliseconds
    success: bool
    response_code: str
    error_message: Optional[str] = None
    thread_name: Optional[str] = None
    bytes_sent: Optional[int] = None
    bytes_received: Optional[int] = None
    latency: Optional[int] = None  # in milliseconds
    connect_time: Optional[int] = None  # in milliseconds
@dataclass
class TestResults:
    """Represents the results of a JMeter test."""
    
    samples: List[Sample] = field(default_factory=list)
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    
    def add_sample(self, sample: Sample) -> None:
        """Add a sample to the test results."""
        self.samples.append(sample)
        
        # Update start and end times
        if self.start_time is None or sample.timestamp < self.start_time:
            self.start_time = sample.timestamp
        if self.end_time is None or sample.timestamp > self.end_time:
            self.end_time = sample.timestamp
@dataclass
class OverallMetrics:
    """Represents overall metrics for a test or endpoint."""
    
    total_samples: int = 0
    error_count: int = 0
    error_rate: float = 0.0
    average_response_time: float = 0.0
    median_response_time: float = 0.0
    percentile_90: float = 0.0
    percentile_95: float = 0.0
    percentile_99: float = 0.0
    min_response_time: float = 0.0
    max_response_time: float = 0.0
    throughput: float = 0.0  # requests per second
    test_duration: float = 0.0  # in seconds
@dataclass
class EndpointMetrics(OverallMetrics):
    """Represents metrics for a specific endpoint/sampler."""
    
    endpoint: str = ""
@dataclass
class TimeSeriesMetrics:
    """Represents metrics for a specific time interval."""
    
    timestamp: datetime
    active_threads: int = 0
    throughput: float = 0.0
    average_response_time: float = 0.0
    error_rate: float = 0.0
@dataclass
class Bottleneck:
    """Represents a performance bottleneck."""
    
    endpoint: str
    metric_type: str  # response_time, error_rate, etc.
    value: float
    threshold: float
    severity: str  # high, medium, low
@dataclass
class Anomaly:
    """Represents a performance anomaly."""
    
    timestamp: datetime
    endpoint: str
    expected_value: float
    actual_value: float
    deviation_percentage: float
@dataclass
class Recommendation:
    """Represents a performance improvement recommendation."""
    
    issue: str
    recommendation: str
    expected_impact: str
    implementation_difficulty: str  # high, medium, low
@dataclass
class Insight:
    """Represents a performance insight."""
    
    topic: str
    description: str
    supporting_data: Dict = field(default_factory=dict)
```
--------------------------------------------------------------------------------
/analyzer/parser/base.py:
--------------------------------------------------------------------------------
```python
"""
Base parser interface for JMeter test results.
This module defines the base interface for JTL parsers.
"""
import abc
from pathlib import Path
from typing import Union
from analyzer.models import TestResults
class JTLParser(abc.ABC):
    """Base class for JTL parsers."""
    
    @abc.abstractmethod
    def parse_file(self, file_path: Union[str, Path]) -> TestResults:
        """Parse a JTL file and return structured test results.
        
        Args:
            file_path: Path to the JTL file
            
        Returns:
            TestResults object containing parsed data
            
        Raises:
            FileNotFoundError: If the file does not exist
            ValueError: If the file format is invalid
        """
        pass
    
    @staticmethod
    def validate_file(file_path: Union[str, Path]) -> bool:
        """Validate that the file exists and has a valid extension.
        
        Args:
            file_path: Path to the JTL file
            
        Returns:
            True if the file is valid, False otherwise
        """
        path = Path(file_path)
        
        # Check if file exists
        if not path.exists():
            return False
        
        # Check if file has a valid extension
        valid_extensions = ['.jtl', '.xml', '.csv']
        if path.suffix.lower() not in valid_extensions:
            return False
        
        return True
    
    @staticmethod
    def detect_format(file_path: Union[str, Path]) -> str:
        """Detect whether the JTL file is in XML or CSV format.
        
        Args:
            file_path: Path to the JTL file
            
        Returns:
            'xml' or 'csv'
            
        Raises:
            FileNotFoundError: If the file does not exist
            ValueError: If the format cannot be determined
        """
        path = Path(file_path)
        
        # Check if file exists
        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Try to determine format based on content
        with open(path, 'r', encoding='utf-8') as f:
            first_line = f.readline().strip()
            
            # Check for XML declaration
            if first_line.startswith('<?xml'):
                return 'xml'
            
            # Check for CSV header
            if ',' in first_line and ('timeStamp' in first_line or 'elapsed' in first_line):
                return 'csv'
            
            # If we can't determine from the first line, check file extension
            if path.suffix.lower() == '.xml':
                return 'xml'
            if path.suffix.lower() == '.csv':
                return 'csv'
            if path.suffix.lower() == '.jtl':
                # For .jtl files, we need to look at more content
                f.seek(0)
                content = f.read(1000)  # Read first 1000 chars
                if '<?xml' in content:
                    return 'xml'
                if ',' in content and ('timeStamp' in content or 'elapsed' in content):
                    return 'csv'
        
        raise ValueError(f"Could not determine format of file: {file_path}")
```
--------------------------------------------------------------------------------
/tests/test_xml_parser.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the XML JTL parser.
"""
import os
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from analyzer.parser.xml_parser import XMLJTLParser
class TestXMLJTLParser(unittest.TestCase):
    """Tests for the XMLJTLParser class."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.parser = XMLJTLParser()
        
        # Create a sample XML JTL file
        self.xml_content = """<?xml version="1.0" encoding="UTF-8"?>
<testResults version="1.2">
<httpSample t="1234" lt="1000" ts="1625097600000" s="true" lb="Home Page" rc="200" rm="" tn="Thread Group 1-1" by="1234" sby="1234" ct="800"/>
<httpSample t="2345" lt="2000" ts="1625097601000" s="true" lb="Login Page" rc="200" rm="" tn="Thread Group 1-1" by="2345" sby="2345" ct="900"/>
<httpSample t="3456" lt="3000" ts="1625097602000" s="false" lb="API Call" rc="500" rm="Internal Server Error" tn="Thread Group 1-2" by="3456" sby="345" ct="1000"/>
</testResults>
"""
        self.xml_file = tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False)
        self.xml_file.write(self.xml_content)
        self.xml_file.close()
    
    def tearDown(self):
        """Tear down test fixtures."""
        os.unlink(self.xml_file.name)
    
    def test_parse_file(self):
        """Test parsing an XML JTL file."""
        test_results = self.parser.parse_file(self.xml_file.name)
        
        # Check that we have the correct number of samples
        self.assertEqual(len(test_results.samples), 3)
        
        # Check the first sample
        sample1 = test_results.samples[0]
        self.assertEqual(sample1.label, "Home Page")
        self.assertEqual(sample1.response_time, 1234)
        self.assertTrue(sample1.success)
        self.assertEqual(sample1.response_code, "200")
        self.assertEqual(sample1.thread_name, "Thread Group 1-1")
        self.assertEqual(sample1.bytes_received, 1234)
        self.assertEqual(sample1.bytes_sent, 1234)
        self.assertEqual(sample1.latency, 1000)
        self.assertEqual(sample1.connect_time, 800)
        
        # Check the third sample (error)
        sample3 = test_results.samples[2]
        self.assertEqual(sample3.label, "API Call")
        self.assertEqual(sample3.response_time, 3456)
        self.assertFalse(sample3.success)
        self.assertEqual(sample3.response_code, "500")
        self.assertEqual(sample3.error_message, "Internal Server Error")
        
        # Check start and end times
        expected_start = datetime.fromtimestamp(1625097600)
        expected_end = datetime.fromtimestamp(1625097602)
        self.assertEqual(test_results.start_time, expected_start)
        self.assertEqual(test_results.end_time, expected_end)
    
    def test_file_not_found(self):
        """Test parsing a non-existent file."""
        with self.assertRaises(FileNotFoundError):
            self.parser.parse_file('/path/to/nonexistent/file.xml')
    
    def test_invalid_format(self):
        """Test parsing a file with invalid format."""
        # Create a non-XML file
        with tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False) as tmp:
            tmp.write("This is not XML")
        
        try:
            with self.assertRaises(ValueError):
                self.parser.parse_file(tmp.name)
        finally:
            os.unlink(tmp.name)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/analyzer/parser/xml_parser.py:
--------------------------------------------------------------------------------
```python
"""
XML parser for JMeter test results.
This module provides functionality for parsing JMeter test results
from JTL files in XML format using SAX for efficient processing.
"""
import xml.sax
from datetime import datetime
from pathlib import Path
from typing import Union
from analyzer.models import Sample, TestResults
from analyzer.parser.base import JTLParser
class JMeterXMLHandler(xml.sax.ContentHandler):
    """SAX handler for JMeter XML results."""
    
    def __init__(self, test_results: TestResults):
        """Initialize the handler.
        
        Args:
            test_results: TestResults object to populate
        """
        super().__init__()
        self.test_results = test_results
    
    def startElement(self, tag, attributes):
        """Process start element.
        
        Args:
            tag: Element tag name
            attributes: Element attributes
        """
        # Process httpSample or sample elements
        if tag in ["httpSample", "sample"]:
            try:
                # Parse timestamp
                ts = int(attributes.get("ts", "0")) / 1000  # Convert from ms to seconds
                timestamp = datetime.fromtimestamp(ts)
                
                # Create sample
                sample = Sample(
                    timestamp=timestamp,
                    label=attributes.get("lb", ""),
                    response_time=int(attributes.get("t", "0")),
                    success=attributes.get("s", "true").lower() == "true",
                    response_code=attributes.get("rc", ""),
                    error_message=attributes.get("rm", ""),
                    thread_name=attributes.get("tn", ""),
                    bytes_received=int(attributes.get("by", "0")),
                    bytes_sent=int(attributes.get("sby", "0")),
                    latency=int(attributes.get("lt", "0")),
                    connect_time=int(attributes.get("ct", "0"))
                )
                
                # Add sample to test results
                self.test_results.add_sample(sample)
                
            except (ValueError, KeyError) as e:
                # Log error but continue processing
                print(f"Error parsing sample: {e}")
class XMLJTLParser(JTLParser):
    """Parser for JMeter JTL files in XML format."""
    
    def parse_file(self, file_path: Union[str, Path]) -> TestResults:
        """Parse a JTL file in XML format.
        
        Args:
            file_path: Path to the JTL file
            
        Returns:
            TestResults object containing parsed data
            
        Raises:
            FileNotFoundError: If the file does not exist
            ValueError: If the file format is invalid
        """
        path = Path(file_path)
        
        # Validate file
        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Detect format
        format_name = self.detect_format(path)
        if format_name != "xml":
            raise ValueError(f"Invalid file format. Expected XML, got {format_name}")
        
        # Create test results object
        test_results = TestResults()
        
        # Create SAX parser
        parser = xml.sax.make_parser()
        parser.setFeature(xml.sax.handler.feature_namespaces, 0)
        
        # Create and set content handler
        handler = JMeterXMLHandler(test_results)
        parser.setContentHandler(handler)
        
        try:
            # Parse the file
            parser.parse(str(path))
        except Exception as e:
            raise ValueError(f"Error parsing XML file: {e}")
        
        return test_results
```
--------------------------------------------------------------------------------
/tests/test_analyzer_parser.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the analyzer parser module.
"""
import os
import tempfile
import unittest
from pathlib import Path
from analyzer.parser.base import JTLParser
class TestJTLParserBase(unittest.TestCase):
    """Tests for the base JTLParser class."""
    
    def test_validate_file_exists(self):
        """Test validating that a file exists."""
        # Create a temporary file
        with tempfile.NamedTemporaryFile(suffix='.jtl') as tmp:
            self.assertTrue(JTLParser.validate_file(tmp.name))
    
    def test_validate_file_not_exists(self):
        """Test validating a non-existent file."""
        self.assertFalse(JTLParser.validate_file('/path/to/nonexistent/file.jtl'))
    
    def test_validate_file_extension(self):
        """Test validating file extensions."""
        # Create temporary files with different extensions
        with tempfile.NamedTemporaryFile(suffix='.jtl') as jtl_file, \
             tempfile.NamedTemporaryFile(suffix='.xml') as xml_file, \
             tempfile.NamedTemporaryFile(suffix='.csv') as csv_file, \
             tempfile.NamedTemporaryFile(suffix='.txt') as txt_file:
            
            self.assertTrue(JTLParser.validate_file(jtl_file.name))
            self.assertTrue(JTLParser.validate_file(xml_file.name))
            self.assertTrue(JTLParser.validate_file(csv_file.name))
            self.assertFalse(JTLParser.validate_file(txt_file.name))
    
    def test_detect_format_xml(self):
        """Test detecting XML format."""
        # Create a temporary XML file
        with tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False) as tmp:
            tmp.write('<?xml version="1.0" encoding="UTF-8"?>\n<testResults>\n</testResults>')
        
        try:
            self.assertEqual(JTLParser.detect_format(tmp.name), 'xml')
        finally:
            os.unlink(tmp.name)
    
    def test_detect_format_csv(self):
        """Test detecting CSV format."""
        # Create a temporary CSV file
        with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
            tmp.write('timeStamp,elapsed,label,responseCode,success\n')
            tmp.write('1625097600000,100,Test,200,true\n')
        
        try:
            self.assertEqual(JTLParser.detect_format(tmp.name), 'csv')
        finally:
            os.unlink(tmp.name)
    
    def test_detect_format_jtl_xml(self):
        """Test detecting XML format in a .jtl file."""
        # Create a temporary JTL file with XML content
        with tempfile.NamedTemporaryFile(suffix='.jtl', mode='w', delete=False) as tmp:
            tmp.write('<?xml version="1.0" encoding="UTF-8"?>\n<testResults>\n</testResults>')
        
        try:
            self.assertEqual(JTLParser.detect_format(tmp.name), 'xml')
        finally:
            os.unlink(tmp.name)
    
    def test_detect_format_jtl_csv(self):
        """Test detecting CSV format in a .jtl file."""
        # Create a temporary JTL file with CSV content
        with tempfile.NamedTemporaryFile(suffix='.jtl', mode='w', delete=False) as tmp:
            tmp.write('timeStamp,elapsed,label,responseCode,success\n')
            tmp.write('1625097600000,100,Test,200,true\n')
        
        try:
            self.assertEqual(JTLParser.detect_format(tmp.name), 'csv')
        finally:
            os.unlink(tmp.name)
    
    def test_detect_format_file_not_found(self):
        """Test detecting format of a non-existent file."""
        with self.assertRaises(FileNotFoundError):
            JTLParser.detect_format('/path/to/nonexistent/file.jtl')
    
    def test_detect_format_unknown(self):
        """Test detecting format of a file with unknown format."""
        # Create a temporary file with unknown content
        with tempfile.NamedTemporaryFile(suffix='.txt', mode='w', delete=False) as tmp:
            tmp.write('This is not a JTL file\n')
        
        try:
            with self.assertRaises(ValueError):
                JTLParser.detect_format(tmp.name)
        finally:
            os.unlink(tmp.name)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_jmeter_server.py:
--------------------------------------------------------------------------------
```python
import sys
import types
import os
import tempfile
import unittest
from unittest import mock
# Stub external dependencies before importing jmeter_server
sys.modules['mcp'] = types.ModuleType('mcp')
sys.modules['mcp.server'] = types.ModuleType('mcp.server')
fastmcp_mod = types.ModuleType('mcp.server.fastmcp')
class FastMCP:
    def __init__(self, *args, **kwargs):
        pass
    def tool(self, *args, **kwargs):
        def decorator(func):
            return func
        return decorator
    def run(self, *args, **kwargs):
        pass
fastmcp_mod.FastMCP = FastMCP
sys.modules['mcp.server.fastmcp'] = fastmcp_mod
# Stub dotenv.load_dotenv
sys.modules['dotenv'] = types.ModuleType('dotenv')
sys.modules['dotenv'].load_dotenv = lambda: None
import jmeter_server
class TestRunJMeter(unittest.IsolatedAsyncioTestCase):
    async def test_file_not_found(self):
        result = await jmeter_server.run_jmeter("nonexistent.jmx")
        self.assertEqual(
            result,
            "Error: Test file not found: nonexistent.jmx"
        )
    async def test_invalid_file_type(self):
        with tempfile.NamedTemporaryFile(suffix=".txt") as tmp:
            result = await jmeter_server.run_jmeter(tmp.name)
            self.assertEqual(
                result,
                f"Error: Invalid file type. Expected .jmx file: {tmp.name}"
            )
    @mock.patch('jmeter_server.subprocess.run')
    async def test_non_gui_success(self, mock_run):
        # Prepare a dummy .jmx file
        with tempfile.NamedTemporaryFile(suffix=".jmx", delete=False) as tmp:
            test_file = tmp.name
        # Fake successful subprocess result
        class DummyResult:
            returncode = 0
            stdout = "Success output"
            stderr = ""
        mock_run.return_value = DummyResult()
        result = await jmeter_server.run_jmeter(test_file, non_gui=True)
        self.assertEqual(result, "Success output")
        os.unlink(test_file)
    @mock.patch('jmeter_server.subprocess.run')
    async def test_non_gui_failure(self, mock_run):
        # Prepare a dummy .jmx file
        with tempfile.NamedTemporaryFile(suffix=".jmx", delete=False) as tmp:
            test_file = tmp.name
        # Fake failing subprocess result
        class DummyResult:
            returncode = 1
            stdout = ""
            stderr = "Error occurred"
        mock_run.return_value = DummyResult()
        result = await jmeter_server.run_jmeter(test_file, non_gui=True)
        self.assertEqual(
            result,
            "Error executing JMeter test:\nError occurred"
        )
        os.unlink(test_file)
    @mock.patch('jmeter_server.subprocess.Popen')
    async def test_gui_mode(self, mock_popen):
        # Prepare a dummy .jmx file
        with tempfile.NamedTemporaryFile(suffix=".jmx", delete=False) as tmp:
            test_file = tmp.name
        result = await jmeter_server.run_jmeter(test_file, non_gui=False)
        self.assertEqual(result, "JMeter GUI launched successfully")
        mock_popen.assert_called()
        os.unlink(test_file)
    @mock.patch('jmeter_server.run_jmeter', new_callable=mock.AsyncMock)
    async def test_execute_jmeter_test_default(self, mock_run_jmeter):
        mock_run_jmeter.return_value = "wrapped output"
        result = await jmeter_server.execute_jmeter_test("file.jmx")
        mock_run_jmeter.assert_awaited_with("file.jmx", non_gui=True)
        self.assertEqual(result, "wrapped output")
    @mock.patch('jmeter_server.run_jmeter', new_callable=mock.AsyncMock)
    async def test_execute_jmeter_test_gui(self, mock_run_jmeter):
        mock_run_jmeter.return_value = "gui output"
        result = await jmeter_server.execute_jmeter_test("file.jmx", gui_mode=True)
        mock_run_jmeter.assert_awaited_with("file.jmx", non_gui=False)
        self.assertEqual(result, "gui output")
    @mock.patch('jmeter_server.run_jmeter', new_callable=mock.AsyncMock)
    async def test_execute_jmeter_test_non_gui(self, mock_run_jmeter):
        mock_run_jmeter.return_value = "non-gui output"
        result = await jmeter_server.execute_jmeter_test_non_gui("file.jmx")
        mock_run_jmeter.assert_awaited_with("file.jmx", non_gui=True)
        self.assertEqual(result, "non-gui output")
class TestUnexpectedError(unittest.IsolatedAsyncioTestCase):
    @mock.patch('jmeter_server.Path.resolve', side_effect=Exception("resolve error"))
    async def test_unexpected_error(self, mock_resolve):
        result = await jmeter_server.run_jmeter("any.jmx")
        self.assertTrue(result.startswith("Unexpected error: resolve error"))
```
--------------------------------------------------------------------------------
/analyzer/parser/csv_parser.py:
--------------------------------------------------------------------------------
```python
"""
CSV parser for JMeter test results.
This module provides functionality for parsing JMeter test results
from JTL files in CSV format using streaming for efficient processing.
"""
import csv
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Union
from analyzer.models import Sample, TestResults
from analyzer.parser.base import JTLParser
class CSVJTLParser(JTLParser):
    """Parser for JMeter JTL files in CSV format."""
    
    # Default column mappings for JMeter CSV output
    DEFAULT_COLUMN_MAPPINGS = {
        'timestamp': 'timeStamp',
        'label': 'label',
        'response_time': 'elapsed',
        'success': 'success',
        'response_code': 'responseCode',
        'error_message': 'responseMessage',
        'thread_name': 'threadName',
        'bytes_received': 'bytes',
        'bytes_sent': 'sentBytes',
        'latency': 'Latency',
        'connect_time': 'Connect'
    }
    
    def __init__(self, column_mappings: Dict[str, str] = None):
        """Initialize the parser.
        
        Args:
            column_mappings: Custom column mappings (default: None)
        """
        self.column_mappings = column_mappings or self.DEFAULT_COLUMN_MAPPINGS
    
    def parse_file(self, file_path: Union[str, Path]) -> TestResults:
        """Parse a JTL file in CSV format.
        
        Args:
            file_path: Path to the JTL file
            
        Returns:
            TestResults object containing parsed data
            
        Raises:
            FileNotFoundError: If the file does not exist
            ValueError: If the file format is invalid
        """
        path = Path(file_path)
        
        # Validate file
        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Detect format
        format_name = self.detect_format(path)
        if format_name != "csv":
            raise ValueError(f"Invalid file format. Expected CSV, got {format_name}")
        
        # Create test results object
        test_results = TestResults()
        
        try:
            # Open and parse the CSV file
            with open(path, 'r', newline='', encoding='utf-8') as csvfile:
                reader = csv.DictReader(csvfile)
                
                # Validate that required columns are present
                if not reader.fieldnames:
                    raise ValueError("CSV file has no header row")
                
                # Check if we can map all required columns
                missing_columns = []
                column_indices = {}
                
                for model_field, csv_field in self.column_mappings.items():
                    if csv_field not in reader.fieldnames:
                        missing_columns.append(csv_field)
                    else:
                        column_indices[model_field] = reader.fieldnames.index(csv_field)
                
                if missing_columns:
                    raise ValueError(f"CSV file is missing required columns: {', '.join(missing_columns)}")
                
                # Process each row
                for row in reader:
                    try:
                        # Parse timestamp (convert from milliseconds to seconds)
                        ts = int(row[self.column_mappings['timestamp']]) / 1000
                        timestamp = datetime.fromtimestamp(ts)
                        
                        # Parse success (convert string to boolean)
                        success_str = row[self.column_mappings['success']].lower()
                        success = success_str == "true" or success_str == "1"
                        
                        # Create sample
                        sample = Sample(
                            timestamp=timestamp,
                            label=row[self.column_mappings['label']],
                            response_time=int(row[self.column_mappings['response_time']]),
                            success=success,
                            response_code=row[self.column_mappings['response_code']],
                            error_message=row.get(self.column_mappings['error_message'], ""),
                            thread_name=row.get(self.column_mappings['thread_name'], ""),
                            bytes_received=int(row.get(self.column_mappings['bytes_received'], 0)),
                            bytes_sent=int(row.get(self.column_mappings['bytes_sent'], 0)),
                            latency=int(row.get(self.column_mappings['latency'], 0)),
                            connect_time=int(row.get(self.column_mappings['connect_time'], 0))
                        )
                        
                        # Add sample to test results
                        test_results.add_sample(sample)
                        
                    except (ValueError, KeyError) as e:
                        # Log error but continue processing
                        print(f"Error parsing row: {e}")
                        continue
        
        except Exception as e:
            raise ValueError(f"Error parsing CSV file: {e}")
        
        return test_results
```
--------------------------------------------------------------------------------
/tests/test_csv_parser.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the CSV JTL parser.
"""
import os
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from analyzer.parser.csv_parser import CSVJTLParser
class TestCSVJTLParser(unittest.TestCase):
    """Tests for the CSVJTLParser class."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.parser = CSVJTLParser()
        
        # Create a sample CSV JTL file
        self.csv_content = """timeStamp,elapsed,label,responseCode,success,threadName,bytes,sentBytes,Latency,Connect,responseMessage
1625097600000,1234,Home Page,200,true,Thread Group 1-1,12345,1234,1000,800,
1625097601000,2345,Login Page,200,true,Thread Group 1-1,23456,2345,2000,900,
1625097602000,3456,API Call,500,false,Thread Group 1-2,3456,345,3000,1000,Internal Server Error
"""
        self.csv_file = tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False)
        self.csv_file.write(self.csv_content)
        self.csv_file.close()
    
    def tearDown(self):
        """Tear down test fixtures."""
        os.unlink(self.csv_file.name)
    
    def test_parse_file(self):
        """Test parsing a CSV JTL file."""
        test_results = self.parser.parse_file(self.csv_file.name)
        
        # Check that we have the correct number of samples
        self.assertEqual(len(test_results.samples), 3)
        
        # Check the first sample
        sample1 = test_results.samples[0]
        self.assertEqual(sample1.label, "Home Page")
        self.assertEqual(sample1.response_time, 1234)
        self.assertTrue(sample1.success)
        self.assertEqual(sample1.response_code, "200")
        self.assertEqual(sample1.thread_name, "Thread Group 1-1")
        self.assertEqual(sample1.bytes_received, 12345)
        self.assertEqual(sample1.bytes_sent, 1234)
        self.assertEqual(sample1.latency, 1000)
        self.assertEqual(sample1.connect_time, 800)
        
        # Check the third sample (error)
        sample3 = test_results.samples[2]
        self.assertEqual(sample3.label, "API Call")
        self.assertEqual(sample3.response_time, 3456)
        self.assertFalse(sample3.success)
        self.assertEqual(sample3.response_code, "500")
        self.assertEqual(sample3.error_message, "Internal Server Error")
        
        # Check start and end times
        expected_start = datetime.fromtimestamp(1625097600)
        expected_end = datetime.fromtimestamp(1625097602)
        self.assertEqual(test_results.start_time, expected_start)
        self.assertEqual(test_results.end_time, expected_end)
    
    def test_file_not_found(self):
        """Test parsing a non-existent file."""
        with self.assertRaises(FileNotFoundError):
            self.parser.parse_file('/path/to/nonexistent/file.csv')
    
    def test_invalid_format(self):
        """Test parsing a file with invalid format."""
        # Create a non-CSV file
        with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
            tmp.write("This is not CSV")
        
        try:
            with self.assertRaises(ValueError):
                self.parser.parse_file(tmp.name)
        finally:
            os.unlink(tmp.name)
    
    def test_missing_columns(self):
        """Test parsing a CSV file with missing required columns."""
        # Create a CSV file with missing columns
        with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
            tmp.write("timestamp,label,responseCode\n")
            tmp.write("1625097600000,Home Page,200\n")
        
        try:
            with self.assertRaises(ValueError):
                self.parser.parse_file(tmp.name)
        finally:
            os.unlink(tmp.name)
    
    def test_custom_column_mappings(self):
        """Test parsing a CSV file with custom column mappings."""
        # Create a CSV file with different column names but standard format
        # to pass the format detection
        custom_csv_content = """timeStamp,elapsed,label,responseCode,success,threadName,bytes,sentBytes,Latency,Connect,responseMessage
1625097600000,1234,Home Page,200,true,Thread Group 1-1,12345,1234,1000,800,
"""
        with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
            tmp.write(custom_csv_content)
        
        try:
            # Create parser with custom column mappings
            custom_mappings = {
                'timestamp': 'timeStamp',
                'label': 'label',
                'response_time': 'elapsed',
                'success': 'success',
                'response_code': 'responseCode',
                'error_message': 'responseMessage',
                'thread_name': 'threadName',
                'bytes_received': 'bytes',
                'bytes_sent': 'sentBytes',
                'latency': 'Latency',
                'connect_time': 'Connect'
            }
            custom_parser = CSVJTLParser(column_mappings=custom_mappings)
            
            # This should work with our custom mappings
            test_results = custom_parser.parse_file(tmp.name)
            self.assertEqual(len(test_results.samples), 1)
            self.assertEqual(test_results.samples[0].label, "Home Page")
        finally:
            os.unlink(tmp.name)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_analyzer_models.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the analyzer models.
"""
import unittest
from datetime import datetime
from analyzer.models import Sample, TestResults, OverallMetrics, EndpointMetrics
class TestSample(unittest.TestCase):
    """Tests for the Sample class."""
    
    def test_sample_creation(self):
        """Test creating a Sample instance."""
        timestamp = datetime.now()
        sample = Sample(
            timestamp=timestamp,
            label="Test Sample",
            response_time=100,
            success=True,
            response_code="200",
            error_message=None,
            thread_name="Thread Group 1-1",
            bytes_sent=150,
            bytes_received=1024,
            latency=50,
            connect_time=20
        )
        
        self.assertEqual(sample.timestamp, timestamp)
        self.assertEqual(sample.label, "Test Sample")
        self.assertEqual(sample.response_time, 100)
        self.assertTrue(sample.success)
        self.assertEqual(sample.response_code, "200")
        self.assertIsNone(sample.error_message)
        self.assertEqual(sample.thread_name, "Thread Group 1-1")
        self.assertEqual(sample.bytes_sent, 150)
        self.assertEqual(sample.bytes_received, 1024)
        self.assertEqual(sample.latency, 50)
        self.assertEqual(sample.connect_time, 20)
class TestTestResults(unittest.TestCase):
    """Tests for the TestResults class."""
    
    def test_add_sample(self):
        """Test adding samples to TestResults."""
        results = TestResults()
        self.assertEqual(len(results.samples), 0)
        
        # Add a sample
        timestamp1 = datetime(2023, 1, 1, 12, 0, 0)
        sample1 = Sample(
            timestamp=timestamp1,
            label="Sample 1",
            response_time=100,
            success=True,
            response_code="200"
        )
        results.add_sample(sample1)
        
        self.assertEqual(len(results.samples), 1)
        self.assertEqual(results.start_time, timestamp1)
        self.assertEqual(results.end_time, timestamp1)
        
        # Add another sample with earlier timestamp
        timestamp2 = datetime(2023, 1, 1, 11, 0, 0)
        sample2 = Sample(
            timestamp=timestamp2,
            label="Sample 2",
            response_time=200,
            success=True,
            response_code="200"
        )
        results.add_sample(sample2)
        
        self.assertEqual(len(results.samples), 2)
        self.assertEqual(results.start_time, timestamp2)  # Should update to earlier time
        self.assertEqual(results.end_time, timestamp1)
        
        # Add another sample with later timestamp
        timestamp3 = datetime(2023, 1, 1, 13, 0, 0)
        sample3 = Sample(
            timestamp=timestamp3,
            label="Sample 3",
            response_time=300,
            success=True,
            response_code="200"
        )
        results.add_sample(sample3)
        
        self.assertEqual(len(results.samples), 3)
        self.assertEqual(results.start_time, timestamp2)
        self.assertEqual(results.end_time, timestamp3)  # Should update to later time
class TestMetrics(unittest.TestCase):
    """Tests for the metrics classes."""
    
    def test_overall_metrics(self):
        """Test creating OverallMetrics instance."""
        metrics = OverallMetrics(
            total_samples=100,
            error_count=5,
            error_rate=5.0,
            average_response_time=250.5,
            median_response_time=220.0,
            percentile_90=400.0,
            percentile_95=450.0,
            percentile_99=500.0,
            min_response_time=100.0,
            max_response_time=600.0,
            throughput=10.5,
            test_duration=60.0
        )
        
        self.assertEqual(metrics.total_samples, 100)
        self.assertEqual(metrics.error_count, 5)
        self.assertEqual(metrics.error_rate, 5.0)
        self.assertEqual(metrics.average_response_time, 250.5)
        self.assertEqual(metrics.median_response_time, 220.0)
        self.assertEqual(metrics.percentile_90, 400.0)
        self.assertEqual(metrics.percentile_95, 450.0)
        self.assertEqual(metrics.percentile_99, 500.0)
        self.assertEqual(metrics.min_response_time, 100.0)
        self.assertEqual(metrics.max_response_time, 600.0)
        self.assertEqual(metrics.throughput, 10.5)
        self.assertEqual(metrics.test_duration, 60.0)
    
    def test_endpoint_metrics(self):
        """Test creating EndpointMetrics instance."""
        metrics = EndpointMetrics(
            endpoint="Test Endpoint",
            total_samples=50,
            error_count=2,
            error_rate=4.0,
            average_response_time=200.5,
            median_response_time=180.0,
            percentile_90=350.0,
            percentile_95=400.0,
            percentile_99=450.0,
            min_response_time=90.0,
            max_response_time=500.0,
            throughput=8.5,
            test_duration=60.0
        )
        
        self.assertEqual(metrics.endpoint, "Test Endpoint")
        self.assertEqual(metrics.total_samples, 50)
        self.assertEqual(metrics.error_count, 2)
        self.assertEqual(metrics.error_rate, 4.0)
        self.assertEqual(metrics.average_response_time, 200.5)
        self.assertEqual(metrics.median_response_time, 180.0)
        self.assertEqual(metrics.percentile_90, 350.0)
        self.assertEqual(metrics.percentile_95, 400.0)
        self.assertEqual(metrics.percentile_99, 450.0)
        self.assertEqual(metrics.min_response_time, 90.0)
        self.assertEqual(metrics.max_response_time, 500.0)
        self.assertEqual(metrics.throughput, 8.5)
        self.assertEqual(metrics.test_duration, 60.0)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_insights_generator.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the insights generator.
"""
import unittest
from analyzer.insights.generator import InsightsGenerator
from analyzer.models import Bottleneck, Recommendation
class TestInsightsGenerator(unittest.TestCase):
    """Tests for the InsightsGenerator class."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.generator = InsightsGenerator()
        
        # Create bottlenecks
        self.bottlenecks = [
            Bottleneck(
                endpoint="Slow Endpoint",
                metric_type="response_time",
                value=500.0,
                threshold=200.0,
                severity="high"
            ),
            Bottleneck(
                endpoint="Medium Endpoint",
                metric_type="response_time",
                value=300.0,
                threshold=200.0,
                severity="medium"
            ),
            Bottleneck(
                endpoint="Error Endpoint",
                metric_type="error_rate",
                value=15.0,
                threshold=5.0,
                severity="high"
            )
        ]
        
        # Create error analysis
        self.error_analysis = {
            "error_types": {
                "Connection timeout": 10,
                "500 Internal Server Error": 5,
                "404 Not Found": 3
            },
            "error_patterns": [
                {
                    "type": "spike",
                    "timestamp": "2023-01-01T12:00:00",
                    "error_count": 8
                }
            ]
        }
        
        # Create concurrency analysis
        self.concurrency_analysis = {
            "correlation": 0.85,
            "degradation_threshold": 50,
            "has_degradation": True
        }
    
    def test_generate_bottleneck_recommendations(self):
        """Test generating recommendations for bottlenecks."""
        recommendations = self.generator.generate_bottleneck_recommendations(self.bottlenecks)
        
        # We should have at least 2 recommendations (one for response time, one for error rate)
        self.assertGreaterEqual(len(recommendations), 2)
        
        # Check that we have recommendations for both types of bottlenecks
        recommendation_issues = [r.issue for r in recommendations]
        self.assertTrue(any("response time" in issue.lower() for issue in recommendation_issues))
        self.assertTrue(any("error rate" in issue.lower() for issue in recommendation_issues))
        
        # Check that recommendations have all required fields
        for recommendation in recommendations:
            self.assertIsNotNone(recommendation.issue)
            self.assertIsNotNone(recommendation.recommendation)
            self.assertIsNotNone(recommendation.expected_impact)
            self.assertIsNotNone(recommendation.implementation_difficulty)
    
    def test_generate_error_recommendations(self):
        """Test generating recommendations for error patterns."""
        recommendations = self.generator.generate_error_recommendations(self.error_analysis)
        
        # We should have at least 3 recommendations (one for each error type)
        self.assertGreaterEqual(len(recommendations), 3)
        
        # Check that we have recommendations for the error types
        recommendation_issues = [r.issue for r in recommendations]
        self.assertTrue(any("timeout" in issue.lower() for issue in recommendation_issues))
        self.assertTrue(any("server" in issue.lower() for issue in recommendation_issues))
        
        # Check that recommendations have all required fields
        for recommendation in recommendations:
            self.assertIsNotNone(recommendation.issue)
            self.assertIsNotNone(recommendation.recommendation)
            self.assertIsNotNone(recommendation.expected_impact)
            self.assertIsNotNone(recommendation.implementation_difficulty)
    
    def test_generate_scaling_insights(self):
        """Test generating insights on scaling behavior."""
        insights = self.generator.generate_scaling_insights(self.concurrency_analysis)
        
        # We should have at least 2 insights
        self.assertGreaterEqual(len(insights), 2)
        
        # Check that we have insights about correlation and degradation
        insight_topics = [i.topic for i in insights]
        self.assertTrue(any("correlation" in topic.lower() for topic in insight_topics))
        self.assertTrue(any("degradation" in topic.lower() for topic in insight_topics))
        
        # Check that insights have all required fields
        for insight in insights:
            self.assertIsNotNone(insight.topic)
            self.assertIsNotNone(insight.description)
            self.assertIsNotNone(insight.supporting_data)
    
    def test_prioritize_recommendations(self):
        """Test prioritizing recommendations."""
        # Create some recommendations
        recommendations = [
            Recommendation(
                issue="Critical response time issues",
                recommendation="Optimize database queries",
                expected_impact="Significant reduction in response times",
                implementation_difficulty="medium"
            ),
            Recommendation(
                issue="Moderate error rates",
                recommendation="Add error handling",
                expected_impact="Reduction in error rates",
                implementation_difficulty="low"
            ),
            Recommendation(
                issue="Minor UI issues",
                recommendation="Fix UI bugs",
                expected_impact="Improved user experience",
                implementation_difficulty="high"
            )
        ]
        
        prioritized = self.generator.prioritize_recommendations(recommendations)
        
        # We should have 3 prioritized recommendations
        self.assertEqual(len(prioritized), 3)
        
        # Check that they are sorted by priority score (descending)
        self.assertGreaterEqual(prioritized[0]["priority_score"], prioritized[1]["priority_score"])
        self.assertGreaterEqual(prioritized[1]["priority_score"], prioritized[2]["priority_score"])
        
        # Check that each prioritized recommendation has the required fields
        for item in prioritized:
            self.assertIn("recommendation", item)
            self.assertIn("priority_score", item)
            self.assertIn("priority_level", item)
    
    def test_empty_inputs(self):
        """Test handling of empty inputs."""
        self.assertEqual(len(self.generator.generate_bottleneck_recommendations([])), 0)
        self.assertEqual(len(self.generator.generate_error_recommendations({})), 0)
        self.assertGreaterEqual(len(self.generator.generate_scaling_insights({})), 1)  # Should still generate at least one insight
        self.assertEqual(len(self.generator.prioritize_recommendations([])), 0)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_bottleneck_analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the bottleneck analyzer.
"""
import unittest
from datetime import datetime, timedelta
from analyzer.bottleneck.analyzer import BottleneckAnalyzer
from analyzer.models import EndpointMetrics, TimeSeriesMetrics
class TestBottleneckAnalyzer(unittest.TestCase):
    """Tests for the BottleneckAnalyzer class."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.analyzer = BottleneckAnalyzer()
        
        # Create endpoint metrics
        self.endpoint_metrics = {
            "Fast Endpoint": EndpointMetrics(
                endpoint="Fast Endpoint",
                total_samples=100,
                error_count=0,
                error_rate=0.0,
                average_response_time=100.0,
                median_response_time=95.0,
                percentile_90=150.0,
                percentile_95=180.0,
                percentile_99=200.0,
                min_response_time=50.0,
                max_response_time=250.0,
                throughput=10.0,
                test_duration=10.0
            ),
            "Medium Endpoint": EndpointMetrics(
                endpoint="Medium Endpoint",
                total_samples=100,
                error_count=2,
                error_rate=2.0,
                average_response_time=200.0,
                median_response_time=190.0,
                percentile_90=300.0,
                percentile_95=350.0,
                percentile_99=400.0,
                min_response_time=100.0,
                max_response_time=450.0,
                throughput=10.0,
                test_duration=10.0
            ),
            "Slow Endpoint": EndpointMetrics(
                endpoint="Slow Endpoint",
                total_samples=100,
                error_count=5,
                error_rate=5.0,
                average_response_time=500.0,
                median_response_time=450.0,
                percentile_90=800.0,
                percentile_95=900.0,
                percentile_99=1000.0,
                min_response_time=200.0,
                max_response_time=1200.0,
                throughput=10.0,
                test_duration=10.0
            ),
            "Error Endpoint": EndpointMetrics(
                endpoint="Error Endpoint",
                total_samples=100,
                error_count=15,
                error_rate=15.0,
                average_response_time=300.0,
                median_response_time=280.0,
                percentile_90=450.0,
                percentile_95=500.0,
                percentile_99=600.0,
                min_response_time=150.0,
                max_response_time=700.0,
                throughput=10.0,
                test_duration=10.0
            )
        }
        
        # Create time series metrics
        base_time = datetime(2023, 1, 1, 12, 0, 0)
        self.time_series_metrics = [
            TimeSeriesMetrics(
                timestamp=base_time + timedelta(seconds=i * 5),
                active_threads=i + 1,
                throughput=10.0,
                average_response_time=100.0 + i * 20,
                error_rate=0.0 if i < 8 else 5.0
            )
            for i in range(10)
        ]
        
        # Add an anomaly
        self.time_series_metrics[5].average_response_time = 500.0  # Spike in the middle
    
    def test_identify_slow_endpoints(self):
        """Test identifying slow endpoints."""
        # Use a higher threshold factor to get only the Slow Endpoint
        bottlenecks = self.analyzer.identify_slow_endpoints(self.endpoint_metrics, threshold_factor=2.0)
        
        # We should have identified the slow endpoint
        self.assertEqual(len(bottlenecks), 1)
        self.assertEqual(bottlenecks[0].endpoint, "Slow Endpoint")
        self.assertEqual(bottlenecks[0].metric_type, "response_time")
        # With threshold_factor=2.0, the severity should be medium or high
        self.assertIn(bottlenecks[0].severity, ["medium", "high"])
        
        # Test with a lower threshold factor to catch more endpoints
        bottlenecks = self.analyzer.identify_slow_endpoints(self.endpoint_metrics, threshold_factor=0.8)
        self.assertGreaterEqual(len(bottlenecks), 2)
        self.assertEqual(bottlenecks[0].endpoint, "Slow Endpoint")  # Should still be first
    
    def test_identify_error_prone_endpoints(self):
        """Test identifying error-prone endpoints."""
        bottlenecks = self.analyzer.identify_error_prone_endpoints(self.endpoint_metrics, threshold_error_rate=3.0)
        
        # We should have identified both error-prone endpoints
        self.assertEqual(len(bottlenecks), 2)
        self.assertEqual(bottlenecks[0].endpoint, "Error Endpoint")  # Higher error rate should be first
        self.assertEqual(bottlenecks[0].metric_type, "error_rate")
        self.assertEqual(bottlenecks[0].severity, "high")
        
        self.assertEqual(bottlenecks[1].endpoint, "Slow Endpoint")
        self.assertEqual(bottlenecks[1].metric_type, "error_rate")
        self.assertEqual(bottlenecks[1].severity, "medium")
        
        # Test with a higher threshold to catch fewer endpoints
        bottlenecks = self.analyzer.identify_error_prone_endpoints(self.endpoint_metrics, threshold_error_rate=10.0)
        self.assertEqual(len(bottlenecks), 1)
        self.assertEqual(bottlenecks[0].endpoint, "Error Endpoint")
    
    def test_detect_anomalies(self):
        """Test detecting response time anomalies."""
        anomalies = self.analyzer.detect_anomalies(self.time_series_metrics)
        
        # We should have detected the spike
        self.assertGreaterEqual(len(anomalies), 1)
        
        # The spike should be the first anomaly
        spike_anomaly = anomalies[0]
        self.assertEqual(spike_anomaly.timestamp, datetime(2023, 1, 1, 12, 0, 25))  # 5th interval
        self.assertGreater(abs(spike_anomaly.deviation_percentage), 50)  # Should be a significant deviation
    
    def test_analyze_concurrency_impact(self):
        """Test analyzing concurrency impact."""
        # Our time series has increasing thread counts and response times
        analysis = self.analyzer.analyze_concurrency_impact(self.time_series_metrics)
        
        # There should be a positive correlation
        self.assertGreater(analysis["correlation"], 0.5)
        
        # Create a new time series with no correlation
        no_correlation_series = [
            TimeSeriesMetrics(
                timestamp=datetime(2023, 1, 1, 12, 0, 0) + timedelta(seconds=i * 5),
                active_threads=i + 1,
                throughput=10.0,
                average_response_time=200.0,  # Constant response time
                error_rate=0.0
            )
            for i in range(10)
        ]
        
        analysis = self.analyzer.analyze_concurrency_impact(no_correlation_series)
        self.assertLess(analysis["correlation"], 0.5)
        self.assertFalse(analysis["has_degradation"])
    
    def test_empty_inputs(self):
        """Test handling of empty inputs."""
        self.assertEqual(len(self.analyzer.identify_slow_endpoints({})), 0)
        self.assertEqual(len(self.analyzer.identify_error_prone_endpoints({})), 0)
        self.assertEqual(len(self.analyzer.detect_anomalies([])), 0)
        
        analysis = self.analyzer.analyze_concurrency_impact([])
        self.assertEqual(analysis["correlation"], 0)
        self.assertFalse(analysis["has_degradation"])
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/windsurf_db_reader.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Windsurf LevelDB Reader
A tool to read and explore Windsurf's local storage database.
"""
import os
import json
import sys
from pathlib import Path
try:
    import plyvel
except ImportError:
    print("plyvel not installed. Install with: pip install plyvel")
    sys.exit(1)
class WindsurfDBReader:
    def __init__(self, db_path=None):
        if db_path is None:
            # Default path for Windsurf Next on macOS
            home = Path.home()
            db_path = home / "Library/Application Support/Windsurf - Next/Local Storage/leveldb"
        
        self.db_path = Path(db_path)
        self.db = None
        
    def connect(self):
        """Connect to the LevelDB database"""
        try:
            self.db = plyvel.DB(str(self.db_path), create_if_missing=False)
            print(f"✅ Connected to database: {self.db_path}")
            return True
        except Exception as e:
            print(f"❌ Failed to connect to database: {e}")
            return False
    
    def close(self):
        """Close the database connection"""
        if self.db:
            self.db.close()
            print("🔒 Database connection closed")
    
    def list_all_keys(self, limit=50):
        """List all keys in the database"""
        if not self.db:
            print("❌ Database not connected")
            return
        
        print(f"\n📋 Listing up to {limit} keys:")
        count = 0
        for key, value in self.db:
            try:
                key_str = key.decode('utf-8', errors='ignore')
                value_preview = str(value[:100]) if len(value) > 100 else str(value)
                print(f"{count + 1:3d}. Key: {key_str}")
                print(f"     Value preview: {value_preview}")
                print(f"     Value size: {len(value)} bytes")
                print("-" * 50)
                
                count += 1
                if count >= limit:
                    break
            except Exception as e:
                print(f"Error reading key {count + 1}: {e}")
                count += 1
    
    def search_keys(self, pattern):
        """Search for keys containing a specific pattern"""
        if not self.db:
            print("❌ Database not connected")
            return
        
        print(f"\n🔍 Searching for keys containing '{pattern}':")
        found = 0
        for key, value in self.db:
            try:
                key_str = key.decode('utf-8', errors='ignore')
                if pattern.lower() in key_str.lower():
                    print(f"Found: {key_str}")
                    print(f"Value size: {len(value)} bytes")
                    
                    # Try to decode value if it looks like JSON
                    try:
                        if value.startswith(b'{') or value.startswith(b'['):
                            json_data = json.loads(value.decode('utf-8'))
                            print(f"JSON preview: {json.dumps(json_data, indent=2)[:200]}...")
                    except:
                        pass
                    
                    print("-" * 50)
                    found += 1
            except Exception as e:
                print(f"Error searching key: {e}")
        
        if found == 0:
            print(f"No keys found containing '{pattern}'")
    
    def get_value(self, key):
        """Get a specific value by key"""
        if not self.db:
            print("❌ Database not connected")
            return None
        
        try:
            key_bytes = key.encode('utf-8') if isinstance(key, str) else key
            value = self.db.get(key_bytes)
            
            if value is None:
                print(f"❌ Key '{key}' not found")
                return None
            
            print(f"✅ Found value for key '{key}':")
            print(f"Size: {len(value)} bytes")
            
            # Try to decode as JSON
            try:
                if value.startswith(b'{') or value.startswith(b'['):
                    json_data = json.loads(value.decode('utf-8'))
                    print("JSON content:")
                    print(json.dumps(json_data, indent=2))
                    return json_data
            except:
                pass
            
            # Try to decode as text
            try:
                text = value.decode('utf-8')
                print("Text content:")
                print(text)
                return text
            except:
                print("Binary content (showing first 200 bytes):")
                print(value[:200])
                return value
                
        except Exception as e:
            print(f"❌ Error getting value: {e}")
            return None
    
    def export_to_json(self, output_file="windsurf_db_export.json", max_entries=1000):
        """Export database contents to JSON file"""
        if not self.db:
            print("❌ Database not connected")
            return
        
        export_data = {}
        count = 0
        
        print(f"📤 Exporting database to {output_file}...")
        
        for key, value in self.db:
            if count >= max_entries:
                break
                
            try:
                key_str = key.decode('utf-8', errors='ignore')
                
                # Try to decode value as JSON first
                try:
                    if value.startswith(b'{') or value.startswith(b'['):
                        value_data = json.loads(value.decode('utf-8'))
                    else:
                        value_data = value.decode('utf-8', errors='ignore')
                except:
                    value_data = f"<binary data: {len(value)} bytes>"
                
                export_data[key_str] = value_data
                count += 1
                
            except Exception as e:
                print(f"Error exporting entry {count}: {e}")
        
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(export_data, f, indent=2, ensure_ascii=False)
            print(f"✅ Exported {count} entries to {output_file}")
        except Exception as e:
            print(f"❌ Error writing export file: {e}")
def main():
    """Main function with interactive menu"""
    reader = WindsurfDBReader()
    
    if not reader.connect():
        return
    
    try:
        while True:
            print("\n" + "="*60)
            print("🌊 Windsurf Database Reader")
            print("="*60)
            print("1. List all keys (first 50)")
            print("2. Search keys by pattern")
            print("3. Get value by key")
            print("4. Export to JSON")
            print("5. Search for 'memory' related keys")
            print("6. Search for 'conversation' related keys")
            print("0. Exit")
            print("-"*60)
            
            choice = input("Enter your choice (0-6): ").strip()
            
            if choice == '0':
                break
            elif choice == '1':
                reader.list_all_keys()
            elif choice == '2':
                pattern = input("Enter search pattern: ").strip()
                if pattern:
                    reader.search_keys(pattern)
            elif choice == '3':
                key = input("Enter key: ").strip()
                if key:
                    reader.get_value(key)
            elif choice == '4':
                filename = input("Enter output filename (default: windsurf_db_export.json): ").strip()
                if not filename:
                    filename = "windsurf_db_export.json"
                reader.export_to_json(filename)
            elif choice == '5':
                reader.search_keys('memory')
            elif choice == '6':
                reader.search_keys('conversation')
            else:
                print("❌ Invalid choice. Please try again.")
                
    except KeyboardInterrupt:
        print("\n👋 Goodbye!")
    finally:
        reader.close()
if __name__ == "__main__":
    main()
```
--------------------------------------------------------------------------------
/tests/test_metrics_calculator.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the metrics calculator.
"""
import unittest
from datetime import datetime, timedelta
from analyzer.metrics.calculator import MetricsCalculator
from analyzer.models import Sample, TestResults
class TestMetricsCalculator(unittest.TestCase):
    """Tests for the MetricsCalculator class."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.calculator = MetricsCalculator()
        
        # Create test results with samples
        self.test_results = TestResults()
        
        # Add samples for endpoint 1
        base_time = datetime(2023, 1, 1, 12, 0, 0)
        for i in range(10):
            sample = Sample(
                timestamp=base_time + timedelta(seconds=i),
                label="Endpoint1",
                response_time=100 + i * 10,  # 100, 110, 120, ..., 190
                success=True,
                response_code="200"
            )
            self.test_results.add_sample(sample)
        
        # Add samples for endpoint 2 (including some errors)
        for i in range(5):
            sample = Sample(
                timestamp=base_time + timedelta(seconds=i + 10),
                label="Endpoint2",
                response_time=200 + i * 20,  # 200, 220, 240, 260, 280
                success=i < 4,  # Last one is an error
                response_code="200" if i < 4 else "500",
                error_message="" if i < 4 else "Internal Server Error"
            )
            self.test_results.add_sample(sample)
    
    def test_calculate_overall_metrics(self):
        """Test calculating overall metrics."""
        metrics = self.calculator.calculate_overall_metrics(self.test_results)
        
        # Check basic metrics
        self.assertEqual(metrics.total_samples, 15)
        self.assertEqual(metrics.error_count, 1)
        self.assertAlmostEqual(metrics.error_rate, 100 * 1/15)
        
        # Check response time metrics
        expected_response_times = [100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 220, 240, 260, 280]
        self.assertAlmostEqual(metrics.average_response_time, sum(expected_response_times) / len(expected_response_times))
        self.assertAlmostEqual(metrics.median_response_time, 170)  # Median of the 15 values
        self.assertAlmostEqual(metrics.min_response_time, 100)
        self.assertAlmostEqual(metrics.max_response_time, 280)
        
        # Check percentiles
        self.assertAlmostEqual(metrics.percentile_90, 260)
        self.assertAlmostEqual(metrics.percentile_95, 270)
        self.assertAlmostEqual(metrics.percentile_99, 278)
        
        # Check throughput and duration
        self.assertEqual(metrics.test_duration, 14)  # 14 seconds from first to last sample
        self.assertAlmostEqual(metrics.throughput, 15 / 14)  # 15 samples over 14 seconds
    
    def test_calculate_endpoint_metrics(self):
        """Test calculating endpoint-specific metrics."""
        endpoint_metrics = self.calculator.calculate_endpoint_metrics(self.test_results)
        
        # Check that we have metrics for both endpoints
        self.assertEqual(len(endpoint_metrics), 2)
        self.assertIn("Endpoint1", endpoint_metrics)
        self.assertIn("Endpoint2", endpoint_metrics)
        
        # Check metrics for endpoint 1
        metrics1 = endpoint_metrics["Endpoint1"]
        self.assertEqual(metrics1.endpoint, "Endpoint1")
        self.assertEqual(metrics1.total_samples, 10)
        self.assertEqual(metrics1.error_count, 0)
        self.assertEqual(metrics1.error_rate, 0)
        self.assertAlmostEqual(metrics1.average_response_time, 145)  # Average of 100, 110, ..., 190
        
        # Check metrics for endpoint 2
        metrics2 = endpoint_metrics["Endpoint2"]
        self.assertEqual(metrics2.endpoint, "Endpoint2")
        self.assertEqual(metrics2.total_samples, 5)
        self.assertEqual(metrics2.error_count, 1)
        self.assertAlmostEqual(metrics2.error_rate, 20)  # 1 error out of 5 samples
        self.assertAlmostEqual(metrics2.average_response_time, 240)  # Average of 200, 220, 240, 260, 280
    
    def test_calculate_time_series_metrics(self):
        """Test calculating time series metrics."""
        # Use a 5-second interval
        time_series = self.calculator.calculate_time_series_metrics(self.test_results, interval_seconds=5)
        
        # We should have 3 intervals: 0-5s, 5-10s, 10-15s
        self.assertEqual(len(time_series), 3)
        
        # Check first interval (0-5s)
        self.assertEqual(time_series[0].timestamp, datetime(2023, 1, 1, 12, 0, 0))
        self.assertEqual(time_series[0].active_threads, 0)  # No thread names in our test data
        self.assertAlmostEqual(time_series[0].throughput, 5 / 5)  # 5 samples over 5 seconds
        self.assertAlmostEqual(time_series[0].average_response_time, (100 + 110 + 120 + 130 + 140) / 5)
        self.assertEqual(time_series[0].error_rate, 0)  # No errors in first interval
        
        # Check third interval (10-15s)
        self.assertEqual(time_series[2].timestamp, datetime(2023, 1, 1, 12, 0, 10))
        self.assertAlmostEqual(time_series[2].throughput, 5 / 5)  # 5 samples over 5 seconds
        self.assertAlmostEqual(time_series[2].average_response_time, (200 + 220 + 240 + 260 + 280) / 5)
        self.assertAlmostEqual(time_series[2].error_rate, 20)  # 1 error out of 5 samples
    
    def test_compare_with_benchmarks(self):
        """Test comparing metrics with benchmarks."""
        # Calculate metrics
        metrics = self.calculator.calculate_overall_metrics(self.test_results)
        
        # Define benchmarks
        benchmarks = {
            "average_response_time": 150,
            "error_rate": 0,
            "throughput": 2
        }
        
        # Compare with benchmarks
        comparison = self.calculator.compare_with_benchmarks(metrics, benchmarks)
        
        # Check comparison results
        self.assertIn("average_response_time", comparison)
        self.assertIn("error_rate", comparison)
        self.assertIn("throughput", comparison)
        
        # Check average_response_time comparison
        avg_rt_comp = comparison["average_response_time"]
        self.assertEqual(avg_rt_comp["benchmark"], 150)
        self.assertAlmostEqual(avg_rt_comp["actual"], metrics.average_response_time)
        self.assertAlmostEqual(avg_rt_comp["difference"], metrics.average_response_time - 150)
        self.assertAlmostEqual(avg_rt_comp["percent_difference"], 
                              (metrics.average_response_time - 150) / 150 * 100)
        
        # Check error_rate comparison
        error_rate_comp = comparison["error_rate"]
        self.assertEqual(error_rate_comp["benchmark"], 0)
        self.assertAlmostEqual(error_rate_comp["actual"], metrics.error_rate)
        self.assertAlmostEqual(error_rate_comp["difference"], metrics.error_rate)
        self.assertEqual(error_rate_comp["percent_difference"], float('inf'))  # Division by zero
        
        # Check throughput comparison
        throughput_comp = comparison["throughput"]
        self.assertEqual(throughput_comp["benchmark"], 2)
        self.assertAlmostEqual(throughput_comp["actual"], metrics.throughput)
        self.assertAlmostEqual(throughput_comp["difference"], metrics.throughput - 2)
        self.assertAlmostEqual(throughput_comp["percent_difference"], 
                              (metrics.throughput - 2) / 2 * 100)
    
    def test_empty_results(self):
        """Test calculating metrics for empty test results."""
        empty_results = TestResults()
        
        with self.assertRaises(ValueError):
            self.calculator.calculate_overall_metrics(empty_results)
        
        with self.assertRaises(ValueError):
            self.calculator.calculate_endpoint_metrics(empty_results)
        
        with self.assertRaises(ValueError):
            self.calculator.calculate_time_series_metrics(empty_results)
    
    def test_invalid_interval(self):
        """Test calculating time series metrics with invalid interval."""
        with self.assertRaises(ValueError):
            self.calculator.calculate_time_series_metrics(self.test_results, interval_seconds=0)
        
        with self.assertRaises(ValueError):
            self.calculator.calculate_time_series_metrics(self.test_results, interval_seconds=-1)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/jmeter_report.html:
--------------------------------------------------------------------------------
```html
        <!DOCTYPE html>
        <html>
        <head>
            <title>JMeter Test Results Analysis</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                h1, h2, h3 { color: #333; }
                table { border-collapse: collapse; width: 100%; margin-bottom: 20px; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
                tr:nth-child(even) { background-color: #f9f9f9; }
                .chart { margin: 20px 0; max-width: 100%; }
                .section { margin-bottom: 30px; }
                .severity-high { color: #d9534f; }
                .severity-medium { color: #f0ad4e; }
                .severity-low { color: #5bc0de; }
            </style>
        </head>
        <body>
            <h1>JMeter Test Results Analysis</h1>
            
            <div class="section">
                <h2>Summary</h2>
                <table>
                    <tr><th>Metric</th><th>Value</th></tr>
                    <tr><td>Total Samples</td><td>96</td></tr>
                    <tr><td>Error Count</td><td>0</td></tr>
                    <tr><td>Error Rate</td><td>0.00%</td></tr>
                    <tr><td>Average Response Time</td><td>222.18 ms</td></tr>
                    <tr><td>Median Response Time</td><td>209.00 ms</td></tr>
                    <tr><td>90th Percentile</td><td>335.50 ms</td></tr>
                    <tr><td>95th Percentile</td><td>345.25 ms</td></tr>
                    <tr><td>99th Percentile</td><td>356.00 ms</td></tr>
                    <tr><td>Min Response Time</td><td>105.00 ms</td></tr>
                    <tr><td>Max Response Time</td><td>356.00 ms</td></tr>
                    <tr><td>Throughput</td><td>0.01 requests/second</td></tr>
                    <tr><td>Start Time</td><td>2025-04-09 20:38:16.160000</td></tr>
                    <tr><td>End Time</td><td>2025-04-09 23:17:02.381000</td></tr>
                    <tr><td>Duration</td><td>9526.22 seconds</td></tr>
                </table>
            </div>
        
                <div class="section">
                    <h2>Endpoint Analysis</h2>
                    <table>
                        <tr>
                            <th>Endpoint</th>
                            <th>Samples</th>
                            <th>Errors</th>
                            <th>Error Rate</th>
                            <th>Avg Response Time</th>
                            <th>95th Percentile</th>
                            <th>Throughput</th>
                        </tr>
                
                        <tr>
                            <td>Login as u1</td>
                            <td>8</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>174.62 ms</td>
                            <td>271.10 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Action = none</td>
                            <td>16</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>253.12 ms</td>
                            <td>350.25 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Action a</td>
                            <td>8</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>235.25 ms</td>
                            <td>327.15 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Action b</td>
                            <td>8</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>224.00 ms</td>
                            <td>317.00 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Action c</td>
                            <td>8</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>231.00 ms</td>
                            <td>349.35 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Action d</td>
                            <td>8</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>215.50 ms</td>
                            <td>270.65 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Logout</td>
                            <td>16</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>214.56 ms</td>
                            <td>356.00 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Action = <EOF></td>
                            <td>16</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>224.12 ms</td>
                            <td>341.00 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                        <tr>
                            <td>Login as u2</td>
                            <td>8</td>
                            <td>0</td>
                            <td>0.00%</td>
                            <td>202.12 ms</td>
                            <td>297.30 ms</td>
                            <td>0.00 req/s</td>
                        </tr>
                    
                    </table>
                </div>
                
                <div class="section">
                    <h2>Bottleneck Analysis</h2>
                
                    <h3>Slow Endpoints</h3>
                    <table>
                        <tr>
                            <th>Endpoint</th>
                            <th>Response Time</th>
                            <th>Threshold</th>
                            <th>Severity</th>
                        </tr>
                    
                            <tr>
                                <td>Logout</td>
                                <td>356.00 ms</td>
                                <td>329.05 ms</td>
                                <td class="severity-low">LOW</td>
                            </tr>
                        
                            <tr>
                                <td>Action = none</td>
                                <td>350.25 ms</td>
                                <td>329.05 ms</td>
                                <td class="severity-low">LOW</td>
                            </tr>
                        
                            <tr>
                                <td>Action c</td>
                                <td>349.35 ms</td>
                                <td>329.05 ms</td>
                                <td class="severity-low">LOW</td>
                            </tr>
                        
                            <tr>
                                <td>Action = <EOF></td>
                                <td>341.00 ms</td>
                                <td>329.05 ms</td>
                                <td class="severity-low">LOW</td>
                            </tr>
                        
                    </table>
                    
                </div>
                
                <div class="section">
                    <h2>Insights and Recommendations</h2>
                
                    <h3>Scaling Insights</h3>
                    <table>
                        <tr>
                            <th>Topic</th>
                            <th>Description</th>
                        </tr>
                    
                            <tr>
                                <td>No Correlation with Concurrency</td>
                                <td>There is little to no correlation between the number of concurrent users and response times, suggesting good scalability</td>
                            </tr>
                        
                            <tr>
                                <td>No Performance Degradation Detected</td>
                                <td>No significant performance degradation was detected with increasing concurrent users within the tested range</td>
                            </tr>
                        
                    </table>
                    
                </div>
                
        </body>
        </html>
        
```
--------------------------------------------------------------------------------
/windsurf_db_reader_alternative.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Windsurf LevelDB Reader - Alternative Implementation
A tool to read and explore Windsurf's local storage database using pure Python.
"""
import os
import json
import sys
import struct
from pathlib import Path
class SimpleLevelDBReader:
    """
    A simple LevelDB reader that can extract basic data without full LevelDB library.
    This is a simplified approach that may not work for all LevelDB files but can
    extract readable data from many cases.
    """
    
    def __init__(self, db_path=None):
        if db_path is None:
            # Default path for Windsurf Next on macOS
            home = Path.home()
            db_path = home / "Library/Application Support/Windsurf - Next/Local Storage/leveldb"
        
        self.db_path = Path(db_path)
        
    def read_ldb_files(self):
        """Read .ldb files and try to extract readable data"""
        if not self.db_path.exists():
            print(f"❌ Database path does not exist: {self.db_path}")
            return []
        
        ldb_files = list(self.db_path.glob("*.ldb"))
        if not ldb_files:
            print("❌ No .ldb files found")
            return []
        
        print(f"📁 Found {len(ldb_files)} .ldb files")
        
        all_data = []
        for ldb_file in ldb_files:
            print(f"📖 Reading {ldb_file.name}...")
            data = self._extract_strings_from_ldb(ldb_file)
            all_data.extend(data)
        
        return all_data
    
    def _extract_strings_from_ldb(self, file_path):
        """Extract readable strings from an LDB file"""
        extracted_data = []
        
        try:
            with open(file_path, 'rb') as f:
                content = f.read()
                
            # Look for JSON-like structures and readable strings
            current_string = ""
            in_string = False
            
            for i, byte in enumerate(content):
                char = chr(byte) if 32 <= byte <= 126 else None  # Printable ASCII
                
                if char:
                    current_string += char
                    in_string = True
                else:
                    if in_string and len(current_string) > 10:  # Only keep longer strings
                        # Check if it looks like JSON or contains useful data
                        if (current_string.startswith('{') or 
                            current_string.startswith('[') or
                            'memory' in current_string.lower() or
                            'conversation' in current_string.lower() or
                            'windsurf' in current_string.lower()):
                            extracted_data.append({
                                'file': file_path.name,
                                'offset': i - len(current_string),
                                'content': current_string,
                                'type': self._guess_content_type(current_string)
                            })
                    current_string = ""
                    in_string = False
            
            # Don't forget the last string
            if in_string and len(current_string) > 10:
                extracted_data.append({
                    'file': file_path.name,
                    'offset': len(content) - len(current_string),
                    'content': current_string,
                    'type': self._guess_content_type(current_string)
                })
                
        except Exception as e:
            print(f"❌ Error reading {file_path}: {e}")
        
        return extracted_data
    
    def _guess_content_type(self, content):
        """Guess the type of content"""
        content_lower = content.lower()
        
        if content.startswith('{') and content.endswith('}'):
            return 'json_object'
        elif content.startswith('[') and content.endswith(']'):
            return 'json_array'
        elif 'memory' in content_lower:
            return 'memory_related'
        elif 'conversation' in content_lower:
            return 'conversation_related'
        elif any(keyword in content_lower for keyword in ['windsurf', 'cascade', 'user', 'assistant']):
            return 'windsurf_related'
        else:
            return 'text'
    
    def search_data(self, data, pattern):
        """Search extracted data for a pattern"""
        results = []
        pattern_lower = pattern.lower()
        
        for item in data:
            if pattern_lower in item['content'].lower():
                results.append(item)
        
        return results
    
    def export_data(self, data, output_file="windsurf_extracted_data.json"):
        """Export extracted data to JSON"""
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            print(f"✅ Exported {len(data)} items to {output_file}")
        except Exception as e:
            print(f"❌ Error exporting data: {e}")
    
    def analyze_data(self, data):
        """Analyze the extracted data"""
        if not data:
            print("❌ No data to analyze")
            return
        
        print(f"\n📊 Analysis of {len(data)} extracted items:")
        print("-" * 50)
        
        # Count by type
        type_counts = {}
        for item in data:
            item_type = item['type']
            type_counts[item_type] = type_counts.get(item_type, 0) + 1
        
        print("📈 Content types:")
        for content_type, count in sorted(type_counts.items()):
            print(f"  {content_type}: {count}")
        
        # Count by file
        file_counts = {}
        for item in data:
            file_name = item['file']
            file_counts[file_name] = file_counts.get(file_name, 0) + 1
        
        print(f"\n📁 Items per file:")
        for file_name, count in sorted(file_counts.items()):
            print(f"  {file_name}: {count}")
        
        # Show some examples
        print(f"\n🔍 Sample content:")
        for i, item in enumerate(data[:5]):  # Show first 5 items
            content_preview = item['content'][:100] + "..." if len(item['content']) > 100 else item['content']
            print(f"  {i+1}. [{item['type']}] {content_preview}")
def main():
    """Main function with interactive menu"""
    reader = SimpleLevelDBReader()
    
    print("🌊 Windsurf Database Reader (Alternative)")
    print("=" * 60)
    print("This tool extracts readable strings from LevelDB files.")
    print("It may not capture all data but can find JSON and text content.")
    print("=" * 60)
    
    try:
        while True:
            print("\nOptions:")
            print("1. Extract all readable data")
            print("2. Search for specific pattern")
            print("3. Analyze extracted data")
            print("4. Export data to JSON")
            print("0. Exit")
            print("-" * 40)
            
            choice = input("Enter your choice (0-4): ").strip()
            
            if choice == '0':
                break
            elif choice == '1':
                print("🔄 Extracting data from LevelDB files...")
                data = reader.read_ldb_files()
                if data:
                    reader.analyze_data(data)
                    # Store data for other operations
                    globals()['extracted_data'] = data
                else:
                    print("❌ No readable data found")
            elif choice == '2':
                if 'extracted_data' not in globals():
                    print("❌ Please extract data first (option 1)")
                    continue
                pattern = input("Enter search pattern: ").strip()
                if pattern:
                    results = reader.search_data(globals()['extracted_data'], pattern)
                    print(f"\n🔍 Found {len(results)} matches for '{pattern}':")
                    for i, item in enumerate(results[:10]):  # Show first 10 matches
                        content_preview = item['content'][:200] + "..." if len(item['content']) > 200 else item['content']
                        print(f"\n{i+1}. File: {item['file']}, Type: {item['type']}")
                        print(f"Content: {content_preview}")
                        print("-" * 40)
            elif choice == '3':
                if 'extracted_data' not in globals():
                    print("❌ Please extract data first (option 1)")
                    continue
                reader.analyze_data(globals()['extracted_data'])
            elif choice == '4':
                if 'extracted_data' not in globals():
                    print("❌ Please extract data first (option 1)")
                    continue
                filename = input("Enter output filename (default: windsurf_extracted_data.json): ").strip()
                if not filename:
                    filename = "windsurf_extracted_data.json"
                reader.export_data(globals()['extracted_data'], filename)
            else:
                print("❌ Invalid choice. Please try again.")
                
    except KeyboardInterrupt:
        print("\n👋 Goodbye!")
if __name__ == "__main__":
    main()
```
--------------------------------------------------------------------------------
/analyzer/metrics/calculator.py:
--------------------------------------------------------------------------------
```python
"""
Metrics calculator for JMeter test results.
This module provides functionality for calculating performance metrics
from JMeter test results, including overall metrics, endpoint-specific metrics,
and time series metrics.
"""
import math
import statistics
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from analyzer.models import (EndpointMetrics, OverallMetrics, Sample,
                           TestResults, TimeSeriesMetrics)
class MetricsCalculator:
    """Calculator for performance metrics from test results."""
    
    def calculate_overall_metrics(self, test_results: TestResults) -> OverallMetrics:
        """Calculate overall metrics for the entire test.
        
        Args:
            test_results: TestResults object containing samples
            
        Returns:
            OverallMetrics object with calculated metrics
            
        Raises:
            ValueError: If test_results contains no samples
        """
        if not test_results.samples:
            raise ValueError("Cannot calculate metrics for empty test results")
        
        # Extract response times and success status
        response_times = [sample.response_time for sample in test_results.samples]
        success_count = sum(1 for sample in test_results.samples if sample.success)
        error_count = len(test_results.samples) - success_count
        
        # Calculate duration
        if test_results.start_time and test_results.end_time:
            duration = (test_results.end_time - test_results.start_time).total_seconds()
        else:
            duration = 0
        
        # Calculate throughput (requests per second)
        throughput = len(test_results.samples) / duration if duration > 0 else 0
        
        # Calculate percentiles
        response_times_sorted = sorted(response_times)
        
        # Create metrics object
        metrics = OverallMetrics(
            total_samples=len(test_results.samples),
            error_count=error_count,
            error_rate=(error_count / len(test_results.samples)) * 100 if test_results.samples else 0,
            average_response_time=statistics.mean(response_times) if response_times else 0,
            median_response_time=statistics.median(response_times) if response_times else 0,
            percentile_90=self._calculate_percentile(response_times_sorted, 90),
            percentile_95=self._calculate_percentile(response_times_sorted, 95),
            percentile_99=self._calculate_percentile(response_times_sorted, 99),
            min_response_time=min(response_times) if response_times else 0,
            max_response_time=max(response_times) if response_times else 0,
            throughput=throughput,
            test_duration=duration
        )
        
        return metrics
    
    def calculate_endpoint_metrics(self, test_results: TestResults) -> Dict[str, EndpointMetrics]:
        """Calculate metrics broken down by endpoint/sampler.
        
        Args:
            test_results: TestResults object containing samples
            
        Returns:
            Dictionary mapping endpoint names to EndpointMetrics objects
            
        Raises:
            ValueError: If test_results contains no samples
        """
        if not test_results.samples:
            raise ValueError("Cannot calculate metrics for empty test results")
        
        # Group samples by endpoint
        endpoints = defaultdict(list)
        for sample in test_results.samples:
            endpoints[sample.label].append(sample)
        
        # Calculate metrics for each endpoint
        endpoint_metrics = {}
        for endpoint, samples in endpoints.items():
            # Create a temporary TestResults object with only samples for this endpoint
            temp_results = TestResults()
            for sample in samples:
                temp_results.add_sample(sample)
            
            # Calculate overall metrics for this endpoint
            overall_metrics = self.calculate_overall_metrics(temp_results)
            
            # Create endpoint metrics
            metrics = EndpointMetrics(
                endpoint=endpoint,
                total_samples=overall_metrics.total_samples,
                error_count=overall_metrics.error_count,
                error_rate=overall_metrics.error_rate,
                average_response_time=overall_metrics.average_response_time,
                median_response_time=overall_metrics.median_response_time,
                percentile_90=overall_metrics.percentile_90,
                percentile_95=overall_metrics.percentile_95,
                percentile_99=overall_metrics.percentile_99,
                min_response_time=overall_metrics.min_response_time,
                max_response_time=overall_metrics.max_response_time,
                throughput=overall_metrics.throughput,
                test_duration=overall_metrics.test_duration
            )
            
            endpoint_metrics[endpoint] = metrics
        
        return endpoint_metrics
    
    def calculate_time_series_metrics(self, test_results: TestResults, 
                                     interval_seconds: int = 5) -> List[TimeSeriesMetrics]:
        """Calculate metrics over time using the specified interval.
        
        Args:
            test_results: TestResults object containing samples
            interval_seconds: Time interval in seconds (default: 5)
            
        Returns:
            List of TimeSeriesMetrics objects, one for each interval
            
        Raises:
            ValueError: If test_results contains no samples or if interval_seconds <= 0
        """
        if not test_results.samples:
            raise ValueError("Cannot calculate metrics for empty test results")
        
        if interval_seconds <= 0:
            raise ValueError("Interval must be positive")
        
        if not test_results.start_time or not test_results.end_time:
            raise ValueError("Test results must have start and end times")
        
        # Create time intervals
        start_time = test_results.start_time
        end_time = test_results.end_time
        
        # Ensure we have at least one interval
        if (end_time - start_time).total_seconds() < interval_seconds:
            end_time = start_time + timedelta(seconds=interval_seconds)
        
        intervals = []
        current_time = start_time
        while current_time < end_time:
            next_time = current_time + timedelta(seconds=interval_seconds)
            intervals.append((current_time, next_time))
            current_time = next_time
        
        # Group samples by interval
        interval_samples = [[] for _ in range(len(intervals))]
        for sample in test_results.samples:
            for i, (start, end) in enumerate(intervals):
                if start <= sample.timestamp < end:
                    interval_samples[i].append(sample)
                    break
        
        # Calculate metrics for each interval
        time_series_metrics = []
        for i, (start, end) in enumerate(intervals):
            samples = interval_samples[i]
            
            # Skip intervals with no samples
            if not samples:
                continue
            
            # Calculate metrics for this interval
            response_times = [sample.response_time for sample in samples]
            error_count = sum(1 for sample in samples if not sample.success)
            
            # Count active threads (approximation based on unique thread names)
            thread_names = set(sample.thread_name for sample in samples if sample.thread_name)
            active_threads = len(thread_names)
            
            # Calculate throughput for this interval
            interval_duration = (end - start).total_seconds()
            throughput = len(samples) / interval_duration if interval_duration > 0 else 0
            
            # Create metrics object
            metrics = TimeSeriesMetrics(
                timestamp=start,
                active_threads=active_threads,
                throughput=throughput,
                average_response_time=statistics.mean(response_times) if response_times else 0,
                error_rate=(error_count / len(samples)) * 100 if samples else 0
            )
            
            time_series_metrics.append(metrics)
        
        return time_series_metrics
    
    def compare_with_benchmarks(self, metrics: OverallMetrics, 
                               benchmarks: Dict[str, float]) -> Dict[str, Dict[str, float]]:
        """Compare metrics with benchmarks.
        
        Args:
            metrics: OverallMetrics object
            benchmarks: Dictionary mapping metric names to benchmark values
            
        Returns:
            Dictionary with comparison results
        """
        comparison = {}
        
        for metric_name, benchmark_value in benchmarks.items():
            if hasattr(metrics, metric_name):
                actual_value = getattr(metrics, metric_name)
                difference = actual_value - benchmark_value
                percent_difference = (difference / benchmark_value) * 100 if benchmark_value != 0 else float('inf')
                
                comparison[metric_name] = {
                    'benchmark': benchmark_value,
                    'actual': actual_value,
                    'difference': difference,
                    'percent_difference': percent_difference
                }
        
        return comparison
    
    def _calculate_percentile(self, sorted_values: List[float], percentile: float) -> float:
        """Calculate a percentile from sorted values.
        
        Args:
            sorted_values: List of values, sorted in ascending order
            percentile: Percentile to calculate (0-100)
            
        Returns:
            Percentile value
        """
        if not sorted_values:
            return 0
        
        # Calculate percentile index
        index = (percentile / 100) * (len(sorted_values) - 1)
        
        # If index is an integer, return the value at that index
        if index.is_integer():
            return sorted_values[int(index)]
        
        # Otherwise, interpolate between the two nearest values
        lower_index = math.floor(index)
        upper_index = math.ceil(index)
        lower_value = sorted_values[lower_index]
        upper_value = sorted_values[upper_index]
        fraction = index - lower_index
        
        return lower_value + (upper_value - lower_value) * fraction
```
--------------------------------------------------------------------------------
/analyzer/bottleneck/analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Bottleneck analyzer for JMeter test results.
This module provides functionality for identifying performance bottlenecks
in JMeter test results, including slow endpoints, error-prone endpoints,
response time anomalies, and concurrency impact analysis.
"""
import statistics
from typing import Dict, List, Optional, Tuple
from analyzer.models import (Anomaly, Bottleneck, EndpointMetrics,
                           OverallMetrics, Sample, TestResults,
                           TimeSeriesMetrics)
class BottleneckAnalyzer:
    """Analyzer for identifying performance bottlenecks."""
    
    def identify_slow_endpoints(self, endpoint_metrics: Dict[str, EndpointMetrics], 
                               threshold_percentile: float = 95,
                               threshold_factor: float = 1.5) -> List[Bottleneck]:
        """Identify endpoints with the highest response times.
        
        Args:
            endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects
            threshold_percentile: Percentile to use for response time threshold (default: 95)
            threshold_factor: Factor to multiply the average response time by (default: 1.5)
            
        Returns:
            List of Bottleneck objects for slow endpoints
        """
        if not endpoint_metrics:
            return []
        
        # Calculate average response time across all endpoints
        avg_response_times = [metrics.average_response_time for metrics in endpoint_metrics.values()]
        overall_avg_response_time = statistics.mean(avg_response_times) if avg_response_times else 0
        
        # Calculate threshold
        threshold = overall_avg_response_time * threshold_factor
        
        # Identify slow endpoints
        bottlenecks = []
        for endpoint, metrics in endpoint_metrics.items():
            # Get the response time at the specified percentile
            percentile_rt = getattr(metrics, f"percentile_{int(threshold_percentile)}", metrics.average_response_time)
            
            # Check if the endpoint is slow
            if percentile_rt > threshold:
                # Determine severity based on how much it exceeds the threshold
                if percentile_rt > threshold * 2:
                    severity = "high"
                elif percentile_rt > threshold * 1.5:
                    severity = "medium"
                else:
                    severity = "low"
                
                bottleneck = Bottleneck(
                    endpoint=endpoint,
                    metric_type="response_time",
                    value=percentile_rt,
                    threshold=threshold,
                    severity=severity
                )
                
                bottlenecks.append(bottleneck)
        
        # Sort bottlenecks by severity and then by value (descending)
        severity_order = {"high": 0, "medium": 1, "low": 2}
        bottlenecks.sort(key=lambda b: (severity_order.get(b.severity, 3), -b.value))
        
        return bottlenecks
    
    def identify_error_prone_endpoints(self, endpoint_metrics: Dict[str, EndpointMetrics], 
                                      threshold_error_rate: float = 1.0) -> List[Bottleneck]:
        """Identify endpoints with the highest error rates.
        
        Args:
            endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects
            threshold_error_rate: Minimum error rate to consider as a bottleneck (default: 1.0%)
            
        Returns:
            List of Bottleneck objects for error-prone endpoints
        """
        if not endpoint_metrics:
            return []
        
        # Identify error-prone endpoints
        bottlenecks = []
        for endpoint, metrics in endpoint_metrics.items():
            # Skip endpoints with no errors
            if metrics.error_count == 0:
                continue
            
            # Check if the endpoint has a high error rate
            if metrics.error_rate >= threshold_error_rate:
                # Determine severity based on error rate
                if metrics.error_rate >= 10.0:
                    severity = "high"
                elif metrics.error_rate >= 5.0:
                    severity = "medium"
                else:
                    severity = "low"
                
                bottleneck = Bottleneck(
                    endpoint=endpoint,
                    metric_type="error_rate",
                    value=metrics.error_rate,
                    threshold=threshold_error_rate,
                    severity=severity
                )
                
                bottlenecks.append(bottleneck)
        
        # Sort bottlenecks by severity and then by value (descending)
        severity_order = {"high": 0, "medium": 1, "low": 2}
        bottlenecks.sort(key=lambda b: (severity_order.get(b.severity, 3), -b.value))
        
        return bottlenecks
    
    def detect_anomalies(self, time_series_metrics: List[TimeSeriesMetrics], 
                        z_score_threshold: float = 2.0) -> List[Anomaly]:
        """Detect response time anomalies and outliers.
        
        Args:
            time_series_metrics: List of TimeSeriesMetrics objects
            z_score_threshold: Z-score threshold for anomaly detection (default: 2.0)
            
        Returns:
            List of Anomaly objects
        """
        if not time_series_metrics:
            return []
        
        # Extract response times
        response_times = [metrics.average_response_time for metrics in time_series_metrics]
        
        # Calculate mean and standard deviation
        mean_rt = statistics.mean(response_times)
        stdev_rt = statistics.stdev(response_times) if len(response_times) > 1 else 0
        
        # Detect anomalies
        anomalies = []
        for metrics in time_series_metrics:
            # Skip if standard deviation is zero (all values are the same)
            if stdev_rt == 0:
                continue
            
            # Calculate z-score
            z_score = (metrics.average_response_time - mean_rt) / stdev_rt
            
            # Check if the response time is an anomaly
            if abs(z_score) >= z_score_threshold:
                # Calculate deviation percentage
                deviation_percentage = ((metrics.average_response_time - mean_rt) / mean_rt) * 100
                
                anomaly = Anomaly(
                    timestamp=metrics.timestamp,
                    endpoint="overall",  # Overall anomaly, not endpoint-specific
                    expected_value=mean_rt,
                    actual_value=metrics.average_response_time,
                    deviation_percentage=deviation_percentage
                )
                
                anomalies.append(anomaly)
        
        # Sort anomalies by deviation percentage (descending)
        anomalies.sort(key=lambda a: abs(a.deviation_percentage), reverse=True)
        
        return anomalies
    
    def analyze_concurrency_impact(self, time_series_metrics: List[TimeSeriesMetrics]) -> Dict:
        """Analyze the impact of concurrency on performance.
        
        Args:
            time_series_metrics: List of TimeSeriesMetrics objects
            
        Returns:
            Dictionary containing concurrency analysis results
        """
        if not time_series_metrics:
            return {"correlation": 0, "degradation_threshold": 0, "has_degradation": False}
        
        # Extract thread counts and response times
        thread_counts = [metrics.active_threads for metrics in time_series_metrics]
        response_times = [metrics.average_response_time for metrics in time_series_metrics]
        
        # Skip if there's no variation in thread counts
        if len(set(thread_counts)) <= 1:
            return {"correlation": 0, "degradation_threshold": 0, "has_degradation": False}
        
        # Calculate correlation between thread count and response time
        try:
            correlation = self._calculate_correlation(thread_counts, response_times)
        except (ValueError, ZeroDivisionError):
            correlation = 0
        
        # Identify potential degradation threshold
        degradation_threshold = 0
        has_degradation = False
        
        if correlation > 0.5:  # Strong positive correlation
            # Group by thread count
            thread_rt_map = {}
            for metrics in time_series_metrics:
                if metrics.active_threads not in thread_rt_map:
                    thread_rt_map[metrics.active_threads] = []
                thread_rt_map[metrics.active_threads].append(metrics.average_response_time)
            
            # Calculate average response time for each thread count
            thread_avg_rt = {
                threads: statistics.mean(rts)
                for threads, rts in thread_rt_map.items()
            }
            
            # Sort by thread count
            sorted_threads = sorted(thread_avg_rt.keys())
            
            # Look for significant increases in response time
            for i in range(1, len(sorted_threads)):
                prev_threads = sorted_threads[i-1]
                curr_threads = sorted_threads[i]
                prev_rt = thread_avg_rt[prev_threads]
                curr_rt = thread_avg_rt[curr_threads]
                
                # Check if response time increased by more than 50%
                if curr_rt > prev_rt * 1.5:
                    degradation_threshold = curr_threads
                    has_degradation = True
                    break
        
        return {
            "correlation": correlation,
            "degradation_threshold": degradation_threshold,
            "has_degradation": has_degradation
        }
    
    def _calculate_correlation(self, x: List[float], y: List[float]) -> float:
        """Calculate Pearson correlation coefficient between two lists.
        
        Args:
            x: First list of values
            y: Second list of values
            
        Returns:
            Correlation coefficient (-1 to 1)
        """
        if len(x) != len(y) or len(x) < 2:
            return 0
        
        # Calculate means
        mean_x = statistics.mean(x)
        mean_y = statistics.mean(y)
        
        # Calculate numerator and denominators
        numerator = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y))
        denom_x = sum((xi - mean_x) ** 2 for xi in x)
        denom_y = sum((yi - mean_y) ** 2 for yi in y)
        
        # Calculate correlation
        if denom_x == 0 or denom_y == 0:
            return 0
        
        return numerator / ((denom_x ** 0.5) * (denom_y ** 0.5))
```
--------------------------------------------------------------------------------
/tests/test_visualization_engine.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the visualization engine.
"""
import os
import tempfile
import unittest
from datetime import datetime, timedelta
from analyzer.models import EndpointMetrics, TimeSeriesMetrics
from analyzer.visualization.engine import VisualizationEngine
class TestVisualizationEngine(unittest.TestCase):
    """Tests for the VisualizationEngine class."""
    
    def setUp(self):
        """Set up test fixtures."""
        # Create a temporary directory for output files
        self.temp_dir = tempfile.mkdtemp()
        self.engine = VisualizationEngine(output_dir=self.temp_dir)
        
        # Create time series metrics
        base_time = datetime(2023, 1, 1, 12, 0, 0)
        self.time_series_metrics = [
            TimeSeriesMetrics(
                timestamp=base_time + timedelta(seconds=i * 5),
                active_threads=i + 1,
                throughput=10.0 + i * 0.5,
                average_response_time=100.0 + i * 20,
                error_rate=0.0 if i < 8 else 5.0
            )
            for i in range(10)
        ]
        
        # Create endpoint metrics
        self.endpoint_metrics = {
            "Endpoint 1": EndpointMetrics(
                endpoint="Endpoint 1",
                total_samples=100,
                error_count=0,
                error_rate=0.0,
                average_response_time=100.0,
                median_response_time=95.0,
                percentile_90=150.0,
                percentile_95=180.0,
                percentile_99=200.0,
                min_response_time=50.0,
                max_response_time=250.0,
                throughput=10.0,
                test_duration=10.0
            ),
            "Endpoint 2": EndpointMetrics(
                endpoint="Endpoint 2",
                total_samples=100,
                error_count=5,
                error_rate=5.0,
                average_response_time=200.0,
                median_response_time=190.0,
                percentile_90=300.0,
                percentile_95=350.0,
                percentile_99=400.0,
                min_response_time=100.0,
                max_response_time=450.0,
                throughput=8.0,
                test_duration=10.0
            ),
            "Endpoint 3": EndpointMetrics(
                endpoint="Endpoint 3",
                total_samples=100,
                error_count=10,
                error_rate=10.0,
                average_response_time=300.0,
                median_response_time=280.0,
                percentile_90=450.0,
                percentile_95=500.0,
                percentile_99=600.0,
                min_response_time=150.0,
                max_response_time=700.0,
                throughput=5.0,
                test_duration=10.0
            )
        }
        
        # Create response times
        self.response_times = [100, 120, 130, 140, 150, 160, 170, 180, 190, 200, 
                              220, 240, 260, 280, 300, 350, 400, 450, 500, 600]
        
        # Create analysis results
        self.analysis_results = {
            "summary": {
                "total_samples": 300,
                "error_count": 15,
                "error_rate": 5.0,
                "average_response_time": 200.0,
                "median_response_time": 180.0,
                "percentile_90": 350.0,
                "percentile_95": 400.0,
                "percentile_99": 500.0,
                "min_response_time": 100.0,
                "max_response_time": 600.0,
                "throughput": 7.5,
                "start_time": datetime(2023, 1, 1, 12, 0, 0),
                "end_time": datetime(2023, 1, 1, 12, 0, 40),
                "duration": 40.0
            },
            "detailed": {
                "endpoints": {
                    "Endpoint 1": {
                        "total_samples": 100,
                        "error_count": 0,
                        "error_rate": 0.0,
                        "average_response_time": 100.0,
                        "median_response_time": 95.0,
                        "percentile_90": 150.0,
                        "percentile_95": 180.0,
                        "percentile_99": 200.0,
                        "min_response_time": 50.0,
                        "max_response_time": 250.0,
                        "throughput": 10.0
                    },
                    "Endpoint 2": {
                        "total_samples": 100,
                        "error_count": 5,
                        "error_rate": 5.0,
                        "average_response_time": 200.0,
                        "median_response_time": 190.0,
                        "percentile_90": 300.0,
                        "percentile_95": 350.0,
                        "percentile_99": 400.0,
                        "min_response_time": 100.0,
                        "max_response_time": 450.0,
                        "throughput": 8.0
                    },
                    "Endpoint 3": {
                        "total_samples": 100,
                        "error_count": 10,
                        "error_rate": 10.0,
                        "average_response_time": 300.0,
                        "median_response_time": 280.0,
                        "percentile_90": 450.0,
                        "percentile_95": 500.0,
                        "percentile_99": 600.0,
                        "min_response_time": 150.0,
                        "max_response_time": 700.0,
                        "throughput": 5.0
                    }
                },
                "bottlenecks": {
                    "slow_endpoints": [
                        {
                            "endpoint": "Endpoint 3",
                            "response_time": 300.0,
                            "threshold": 200.0,
                            "severity": "high"
                        }
                    ],
                    "error_prone_endpoints": [
                        {
                            "endpoint": "Endpoint 3",
                            "error_rate": 10.0,
                            "threshold": 5.0,
                            "severity": "medium"
                        }
                    ]
                },
                "insights": {
                    "recommendations": [
                        {
                            "issue": "High response time in Endpoint 3",
                            "recommendation": "Optimize database queries",
                            "expected_impact": "Reduced response time",
                            "implementation_difficulty": "medium",
                            "priority_level": "high"
                        }
                    ],
                    "scaling_insights": [
                        {
                            "topic": "Concurrency Impact",
                            "description": "Performance degrades with increasing concurrency"
                        }
                    ]
                }
            }
        }
    
    def tearDown(self):
        """Tear down test fixtures."""
        # Clean up temporary directory
        for file in os.listdir(self.temp_dir):
            os.remove(os.path.join(self.temp_dir, file))
        os.rmdir(self.temp_dir)
    
    def test_create_time_series_graph(self):
        """Test creating a time series graph."""
        # Test with default parameters
        graph = self.engine.create_time_series_graph(self.time_series_metrics)
        self.assertIsNotNone(graph)
        self.assertEqual(graph["type"], "time_series")
        
        # Test with output file
        output_file = "time_series.txt"
        output_path = self.engine.create_time_series_graph(
            self.time_series_metrics, output_file=output_file)
        self.assertTrue(os.path.exists(output_path))
        
        # Test with different metric
        graph = self.engine.create_time_series_graph(
            self.time_series_metrics, metric_name="throughput")
        self.assertIsNotNone(graph)
        self.assertEqual(graph["y_label"], "Throughput (requests/second)")
        
        # Test with empty metrics
        with self.assertRaises(ValueError):
            self.engine.create_time_series_graph([])
        
        # Test with invalid metric name
        with self.assertRaises(ValueError):
            self.engine.create_time_series_graph(
                self.time_series_metrics, metric_name="invalid_metric")
    
    def test_create_distribution_graph(self):
        """Test creating a distribution graph."""
        # Test with default parameters
        graph = self.engine.create_distribution_graph(self.response_times)
        self.assertIsNotNone(graph)
        self.assertEqual(graph["type"], "distribution")
        
        # Test with output file
        output_file = "distribution.txt"
        output_path = self.engine.create_distribution_graph(
            self.response_times, output_file=output_file)
        self.assertTrue(os.path.exists(output_path))
        
        # Test with custom percentiles
        graph = self.engine.create_distribution_graph(
            self.response_times, percentiles=[25, 50, 75])
        self.assertIsNotNone(graph)
        self.assertIn(25, graph["percentiles"])
        self.assertIn(50, graph["percentiles"])
        self.assertIn(75, graph["percentiles"])
        
        # Test with empty response times
        with self.assertRaises(ValueError):
            self.engine.create_distribution_graph([])
    
    def test_create_endpoint_comparison_chart(self):
        """Test creating an endpoint comparison chart."""
        # Test with default parameters
        chart = self.engine.create_endpoint_comparison_chart(self.endpoint_metrics)
        self.assertIsNotNone(chart)
        self.assertEqual(chart["type"], "endpoint_comparison")
        
        # Test with output file
        output_file = "comparison.txt"
        output_path = self.engine.create_endpoint_comparison_chart(
            self.endpoint_metrics, output_file=output_file)
        self.assertTrue(os.path.exists(output_path))
        
        # Test with different metric
        chart = self.engine.create_endpoint_comparison_chart(
            self.endpoint_metrics, metric_name="error_rate")
        self.assertIsNotNone(chart)
        self.assertEqual(chart["x_label"], "Error Rate (%)")
        
        # Test with empty metrics
        with self.assertRaises(ValueError):
            self.engine.create_endpoint_comparison_chart({})
        
        # Test with invalid metric name
        with self.assertRaises(ValueError):
            self.engine.create_endpoint_comparison_chart(
                self.endpoint_metrics, metric_name="invalid_metric")
    
    def test_create_html_report(self):
        """Test creating an HTML report."""
        output_file = "report.html"
        output_path = self.engine.create_html_report(
            self.analysis_results, output_file=output_file)
        
        # Check that the file exists
        self.assertTrue(os.path.exists(output_path))
        
        # Check that the file contains expected content
        with open(output_path, 'r') as f:
            content = f.read()
            self.assertIn("JMeter Test Results Analysis", content)
            self.assertIn("Endpoint Analysis", content)
            self.assertIn("Bottleneck Analysis", content)
            self.assertIn("Insights and Recommendations", content)
    
    def test_figure_to_base64(self):
        """Test converting a figure to base64."""
        graph = self.engine.create_time_series_graph(self.time_series_metrics)
        base64_str = self.engine.figure_to_base64(graph)
        
        # Check that the result is a non-empty string
        self.assertIsInstance(base64_str, str)
        self.assertTrue(len(base64_str) > 0)
if __name__ == '__main__':
    unittest.main()
```
--------------------------------------------------------------------------------
/analyzer/analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Main analyzer module for JMeter test results.
This module provides the main entry point for analyzing JMeter test results.
It orchestrates the flow of data through the various components of the analyzer.
"""
from pathlib import Path
from datetime import timedelta
from typing import Dict, List, Optional, Union
from analyzer.models import TestResults, OverallMetrics, Bottleneck
from analyzer.parser.base import JTLParser
from analyzer.parser.xml_parser import XMLJTLParser
from analyzer.parser.csv_parser import CSVJTLParser
from analyzer.metrics.calculator import MetricsCalculator
from analyzer.bottleneck.analyzer import BottleneckAnalyzer
from analyzer.insights.generator import InsightsGenerator
class TestResultsAnalyzer:
    """Main analyzer class for JMeter test results."""
    
    def __init__(self):
        """Initialize the analyzer."""
        self.parsers = {}
        self.metrics_calculator = MetricsCalculator()
        self.bottleneck_analyzer = BottleneckAnalyzer()
        self.insights_generator = InsightsGenerator()
        
        # Register default parsers
        self.register_parser('xml', XMLJTLParser())
        self.register_parser('csv', CSVJTLParser())
    
    def register_parser(self, format_name: str, parser: JTLParser) -> None:
        """Register a parser for a specific format.
        
        Args:
            format_name: Name of the format (e.g., 'xml', 'csv')
            parser: Parser instance
        """
        self.parsers[format_name] = parser
    
    def analyze_file(self, file_path: Union[str, Path], 
                    detailed: bool = False) -> Dict:
        """Analyze a JTL file and return the results.
        
        Args:
            file_path: Path to the JTL file
            detailed: Whether to include detailed analysis
            
        Returns:
            Dictionary containing analysis results
            
        Raises:
            FileNotFoundError: If the file does not exist
            ValueError: If the file format is invalid or unsupported
        """
        path = Path(file_path)
        
        # Validate file
        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Detect format
        format_name = JTLParser.detect_format(path)
        
        # Get appropriate parser
        if format_name not in self.parsers:
            raise ValueError(f"No parser available for format: {format_name}")
        
        parser = self.parsers[format_name]
        
        # Parse file
        test_results = parser.parse_file(path)
        
        # Perform analysis
        analysis_results = self._analyze_results(test_results, detailed)
        
        return analysis_results
    
    def _analyze_results(self, test_results: TestResults, 
                        detailed: bool = False) -> Dict:
        """Analyze test results and return the analysis.
        
        Args:
            test_results: TestResults object
            detailed: Whether to include detailed analysis
            
        Returns:
            Dictionary containing analysis results
        """
        # Calculate overall metrics
        overall_metrics = self.metrics_calculator.calculate_overall_metrics(test_results)
        
        # Create basic results structure
        results = {
            "summary": {
                "total_samples": overall_metrics.total_samples,
                "error_count": overall_metrics.error_count,
                "error_rate": overall_metrics.error_rate,
                "average_response_time": overall_metrics.average_response_time,
                "median_response_time": overall_metrics.median_response_time,
                "percentile_90": overall_metrics.percentile_90,
                "percentile_95": overall_metrics.percentile_95,
                "percentile_99": overall_metrics.percentile_99,
                "min_response_time": overall_metrics.min_response_time,
                "max_response_time": overall_metrics.max_response_time,
                "throughput": overall_metrics.throughput,
                "start_time": test_results.start_time,
                "end_time": test_results.end_time,
                "duration": overall_metrics.test_duration
            }
        }
        
        # Add detailed analysis if requested
        if detailed:
            # Calculate endpoint metrics
            endpoint_metrics = self.metrics_calculator.calculate_endpoint_metrics(test_results)
            
            # Calculate time series metrics (5-second intervals)
            try:
                time_series_metrics = self.metrics_calculator.calculate_time_series_metrics(
                    test_results, interval_seconds=5)
            except ValueError:
                time_series_metrics = []
            
            # Identify bottlenecks
            slow_endpoints = self.bottleneck_analyzer.identify_slow_endpoints(endpoint_metrics)
            error_prone_endpoints = self.bottleneck_analyzer.identify_error_prone_endpoints(endpoint_metrics)
            anomalies = self.bottleneck_analyzer.detect_anomalies(time_series_metrics)
            concurrency_impact = self.bottleneck_analyzer.analyze_concurrency_impact(time_series_metrics)
            
            # Generate insights and recommendations
            all_bottlenecks = slow_endpoints + error_prone_endpoints
            bottleneck_recommendations = self.insights_generator.generate_bottleneck_recommendations(all_bottlenecks)
            
            # Create error analysis
            error_analysis = self._create_error_analysis(test_results)
            error_recommendations = self.insights_generator.generate_error_recommendations(error_analysis)
            
            # Generate scaling insights
            scaling_insights = self.insights_generator.generate_scaling_insights(concurrency_impact)
            
            # Prioritize all recommendations
            all_recommendations = bottleneck_recommendations + error_recommendations
            prioritized_recommendations = self.insights_generator.prioritize_recommendations(all_recommendations)
            
            # Add to results
            results["detailed"] = {
                "samples_count": len(test_results.samples),
                "endpoints": {
                    endpoint: {
                        "total_samples": metrics.total_samples,
                        "error_count": metrics.error_count,
                        "error_rate": metrics.error_rate,
                        "average_response_time": metrics.average_response_time,
                        "median_response_time": metrics.median_response_time,
                        "percentile_90": metrics.percentile_90,
                        "percentile_95": metrics.percentile_95,
                        "percentile_99": metrics.percentile_99,
                        "min_response_time": metrics.min_response_time,
                        "max_response_time": metrics.max_response_time,
                        "throughput": metrics.throughput
                    }
                    for endpoint, metrics in endpoint_metrics.items()
                },
                "time_series": [
                    {
                        "timestamp": metrics.timestamp.isoformat(),
                        "active_threads": metrics.active_threads,
                        "throughput": metrics.throughput,
                        "average_response_time": metrics.average_response_time,
                        "error_rate": metrics.error_rate
                    }
                    for metrics in time_series_metrics
                ],
                "bottlenecks": {
                    "slow_endpoints": [
                        {
                            "endpoint": bottleneck.endpoint,
                            "response_time": bottleneck.value,
                            "threshold": bottleneck.threshold,
                            "severity": bottleneck.severity
                        }
                        for bottleneck in slow_endpoints
                    ],
                    "error_prone_endpoints": [
                        {
                            "endpoint": bottleneck.endpoint,
                            "error_rate": bottleneck.value,
                            "threshold": bottleneck.threshold,
                            "severity": bottleneck.severity
                        }
                        for bottleneck in error_prone_endpoints
                    ],
                    "anomalies": [
                        {
                            "timestamp": anomaly.timestamp.isoformat(),
                            "expected_value": anomaly.expected_value,
                            "actual_value": anomaly.actual_value,
                            "deviation_percentage": anomaly.deviation_percentage
                        }
                        for anomaly in anomalies
                    ],
                    "concurrency_impact": concurrency_impact
                },
                "insights": {
                    "recommendations": [
                        {
                            "issue": rec["recommendation"].issue,
                            "recommendation": rec["recommendation"].recommendation,
                            "expected_impact": rec["recommendation"].expected_impact,
                            "implementation_difficulty": rec["recommendation"].implementation_difficulty,
                            "priority_level": rec["priority_level"]
                        }
                        for rec in prioritized_recommendations
                    ],
                    "scaling_insights": [
                        {
                            "topic": insight.topic,
                            "description": insight.description
                        }
                        for insight in scaling_insights
                    ]
                }
            }
        
        return results
    
    def _create_error_analysis(self, test_results: TestResults) -> Dict:
        """Create error analysis from test results.
        
        Args:
            test_results: TestResults object
            
        Returns:
            Dictionary containing error analysis
        """
        # Extract error samples
        error_samples = [sample for sample in test_results.samples if not sample.success]
        
        if not error_samples:
            return {"error_types": {}, "error_patterns": []}
        
        # Count error types
        error_types = {}
        for sample in error_samples:
            error_message = sample.error_message or f"HTTP {sample.response_code}"
            if error_message in error_types:
                error_types[error_message] += 1
            else:
                error_types[error_message] = 1
        
        # Detect error patterns
        error_patterns = []
        
        # Check for error spikes
        if test_results.start_time and test_results.end_time:
            # Group errors by time intervals (5-second intervals)
            interval_seconds = 5
            duration = (test_results.end_time - test_results.start_time).total_seconds()
            num_intervals = int(duration / interval_seconds) + 1
            
            # Count errors in each interval
            interval_errors = [0] * num_intervals
            for sample in error_samples:
                interval_index = int((sample.timestamp - test_results.start_time).total_seconds() / interval_seconds)
                if 0 <= interval_index < num_intervals:
                    interval_errors[interval_index] += 1
            
            # Calculate average errors per interval
            avg_errors = sum(interval_errors) / len(interval_errors) if interval_errors else 0
            
            # Detect spikes (intervals with errors > 2 * average)
            for i, error_count in enumerate(interval_errors):
                if error_count > 2 * avg_errors and error_count > 1:
                    spike_time = test_results.start_time + timedelta(seconds=i * interval_seconds)
                    error_patterns.append({
                        "type": "spike",
                        "timestamp": spike_time.isoformat(),
                        "error_count": error_count
                    })
        
        return {
            "error_types": error_types,
            "error_patterns": error_patterns
        }
```
--------------------------------------------------------------------------------
/analyzer/insights/generator.py:
--------------------------------------------------------------------------------
```python
"""
Insights generator for JMeter test results.
This module provides functionality for generating insights and recommendations
based on JMeter test results analysis, including bottleneck recommendations,
error pattern analysis, scaling insights, and recommendation prioritization.
"""
from typing import Dict, List, Optional, Tuple, Union
from analyzer.models import (Bottleneck, EndpointMetrics, Insight,
                           OverallMetrics, Recommendation, TestResults)
class InsightsGenerator:
    """Generator for insights and recommendations based on test results analysis."""
    
    def generate_bottleneck_recommendations(self, bottlenecks: List[Bottleneck]) -> List[Recommendation]:
        """Generate recommendations for addressing identified bottlenecks.
        
        Args:
            bottlenecks: List of Bottleneck objects
            
        Returns:
            List of Recommendation objects
        """
        recommendations = []
        
        # Process response time bottlenecks
        response_time_bottlenecks = [b for b in bottlenecks if b.metric_type == "response_time"]
        if response_time_bottlenecks:
            # Group by severity
            high_severity = [b for b in response_time_bottlenecks if b.severity == "high"]
            medium_severity = [b for b in response_time_bottlenecks if b.severity == "medium"]
            
            # Generate recommendations for high severity bottlenecks
            if high_severity:
                endpoints = ", ".join(b.endpoint for b in high_severity[:3])
                recommendation = Recommendation(
                    issue=f"Critical response time issues in endpoints: {endpoints}",
                    recommendation="Optimize database queries, add caching, or consider asynchronous processing for these endpoints",
                    expected_impact="Significant reduction in response times and improved user experience",
                    implementation_difficulty="medium"
                )
                recommendations.append(recommendation)
            
            # Generate recommendations for medium severity bottlenecks
            if medium_severity:
                endpoints = ", ".join(b.endpoint for b in medium_severity[:3])
                recommendation = Recommendation(
                    issue=f"Moderate response time issues in endpoints: {endpoints}",
                    recommendation="Profile the code to identify bottlenecks and optimize the most expensive operations",
                    expected_impact="Moderate improvement in response times",
                    implementation_difficulty="medium"
                )
                recommendations.append(recommendation)
        
        # Process error rate bottlenecks
        error_rate_bottlenecks = [b for b in bottlenecks if b.metric_type == "error_rate"]
        if error_rate_bottlenecks:
            # Group by severity
            high_severity = [b for b in error_rate_bottlenecks if b.severity == "high"]
            medium_severity = [b for b in error_rate_bottlenecks if b.severity == "medium"]
            
            # Generate recommendations for high severity bottlenecks
            if high_severity:
                endpoints = ", ".join(b.endpoint for b in high_severity[:3])
                recommendation = Recommendation(
                    issue=f"High error rates in endpoints: {endpoints}",
                    recommendation="Investigate error logs, add proper error handling, and fix the root causes of errors",
                    expected_impact="Significant reduction in error rates and improved reliability",
                    implementation_difficulty="high"
                )
                recommendations.append(recommendation)
            
            # Generate recommendations for medium severity bottlenecks
            if medium_severity:
                endpoints = ", ".join(b.endpoint for b in medium_severity[:3])
                recommendation = Recommendation(
                    issue=f"Moderate error rates in endpoints: {endpoints}",
                    recommendation="Review error handling and add appropriate validation and error recovery mechanisms",
                    expected_impact="Moderate reduction in error rates",
                    implementation_difficulty="medium"
                )
                recommendations.append(recommendation)
        
        return recommendations
    
    def generate_error_recommendations(self, error_analysis: Dict) -> List[Recommendation]:
        """Generate recommendations for addressing error patterns.
        
        Args:
            error_analysis: Dictionary containing error analysis results
            
        Returns:
            List of Recommendation objects
        """
        recommendations = []
        
        # Process error types
        error_types = error_analysis.get("error_types", {})
        if error_types:
            # Find the most common error types
            sorted_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)
            top_errors = sorted_errors[:3]
            
            for error_type, count in top_errors:
                if "timeout" in error_type.lower():
                    recommendation = Recommendation(
                        issue=f"Timeout errors ({count} occurrences)",
                        recommendation="Increase timeout thresholds, optimize slow operations, or implement circuit breakers",
                        expected_impact="Reduction in timeout errors and improved reliability",
                        implementation_difficulty="medium"
                    )
                    recommendations.append(recommendation)
                
                elif "connection" in error_type.lower():
                    recommendation = Recommendation(
                        issue=f"Connection errors ({count} occurrences)",
                        recommendation="Implement connection pooling, retry mechanisms, or check network configuration",
                        expected_impact="Improved connection stability and reduced errors",
                        implementation_difficulty="medium"
                    )
                    recommendations.append(recommendation)
                
                elif "500" in error_type or "server" in error_type.lower():
                    recommendation = Recommendation(
                        issue=f"Server errors ({count} occurrences)",
                        recommendation="Check server logs, fix application bugs, and add proper error handling",
                        expected_impact="Reduction in server errors and improved reliability",
                        implementation_difficulty="high"
                    )
                    recommendations.append(recommendation)
                
                elif "400" in error_type or "client" in error_type.lower():
                    recommendation = Recommendation(
                        issue=f"Client errors ({count} occurrences)",
                        recommendation="Validate input data, fix client-side issues, and improve error messages",
                        expected_impact="Reduction in client errors and improved user experience",
                        implementation_difficulty="medium"
                    )
                    recommendations.append(recommendation)
                
                else:
                    recommendation = Recommendation(
                        issue=f"{error_type} errors ({count} occurrences)",
                        recommendation="Investigate the root cause and implement appropriate error handling",
                        expected_impact="Reduction in errors and improved reliability",
                        implementation_difficulty="medium"
                    )
                    recommendations.append(recommendation)
        
        # Process error patterns
        error_patterns = error_analysis.get("error_patterns", [])
        if error_patterns:
            for pattern in error_patterns:
                pattern_type = pattern.get("type", "")
                
                if pattern_type == "spike":
                    recommendation = Recommendation(
                        issue="Error spike detected during the test",
                        recommendation="Investigate what happened during the spike period and address the underlying cause",
                        expected_impact="Prevention of error spikes in production",
                        implementation_difficulty="medium"
                    )
                    recommendations.append(recommendation)
                
                elif pattern_type == "increasing":
                    recommendation = Recommendation(
                        issue="Increasing error rate over time",
                        recommendation="Check for resource leaks, memory issues, or degrading performance under load",
                        expected_impact="Stable error rates during extended usage",
                        implementation_difficulty="high"
                    )
                    recommendations.append(recommendation)
        
        return recommendations
    
    def generate_scaling_insights(self, concurrency_analysis: Dict) -> List[Insight]:
        """Generate insights on scaling behavior and capacity limits.
        
        Args:
            concurrency_analysis: Dictionary containing concurrency analysis results
            
        Returns:
            List of Insight objects
        """
        insights = []
        
        correlation = concurrency_analysis.get("correlation", 0)
        has_degradation = concurrency_analysis.get("has_degradation", False)
        degradation_threshold = concurrency_analysis.get("degradation_threshold", 0)
        
        # Generate insights based on correlation
        if correlation > 0.8:
            insight = Insight(
                topic="Strong Correlation with Concurrency",
                description="There is a strong correlation between the number of concurrent users and response times, indicating potential scalability issues",
                supporting_data={"correlation": correlation}
            )
            insights.append(insight)
        elif correlation > 0.5:
            insight = Insight(
                topic="Moderate Correlation with Concurrency",
                description="There is a moderate correlation between the number of concurrent users and response times, suggesting some scalability concerns",
                supporting_data={"correlation": correlation}
            )
            insights.append(insight)
        elif correlation < 0.2 and correlation > -0.2:
            insight = Insight(
                topic="No Correlation with Concurrency",
                description="There is little to no correlation between the number of concurrent users and response times, suggesting good scalability",
                supporting_data={"correlation": correlation}
            )
            insights.append(insight)
        
        # Generate insights based on degradation threshold
        if has_degradation:
            insight = Insight(
                topic="Performance Degradation Threshold",
                description=f"Performance begins to degrade significantly at {degradation_threshold} concurrent users, indicating a potential capacity limit",
                supporting_data={"degradation_threshold": degradation_threshold}
            )
            insights.append(insight)
            
            # Add recommendation for addressing the degradation
            if degradation_threshold > 0:
                insight = Insight(
                    topic="Scaling Recommendation",
                    description=f"Consider horizontal scaling or optimization before reaching {degradation_threshold} concurrent users to maintain performance",
                    supporting_data={"degradation_threshold": degradation_threshold}
                )
                insights.append(insight)
        else:
            insight = Insight(
                topic="No Performance Degradation Detected",
                description="No significant performance degradation was detected with increasing concurrent users within the tested range",
                supporting_data={}
            )
            insights.append(insight)
        
        return insights
    
    def prioritize_recommendations(self, recommendations: List[Recommendation]) -> List[Dict]:
        """Prioritize recommendations based on potential impact.
        
        Args:
            recommendations: List of Recommendation objects
            
        Returns:
            List of dictionaries containing prioritized recommendations
        """
        if not recommendations:
            return []
        
        # Define scoring system
        severity_scores = {
            "high": 3,
            "medium": 2,
            "low": 1
        }
        
        difficulty_scores = {
            "low": 3,
            "medium": 2,
            "high": 1
        }
        
        # Calculate priority score for each recommendation
        prioritized = []
        for recommendation in recommendations:
            # Extract severity from the issue (if available)
            severity = "medium"  # Default
            if "critical" in recommendation.issue.lower():
                severity = "high"
            elif "moderate" in recommendation.issue.lower():
                severity = "medium"
            elif "minor" in recommendation.issue.lower():
                severity = "low"
            
            # Get difficulty
            difficulty = recommendation.implementation_difficulty
            
            # Calculate priority score (higher is more important)
            severity_score = severity_scores.get(severity, 2)
            difficulty_score = difficulty_scores.get(difficulty, 2)
            
            # Priority formula: severity * 2 + ease of implementation
            # This weights severity more heavily than implementation difficulty
            priority_score = severity_score * 2 + difficulty_score
            
            prioritized.append({
                "recommendation": recommendation,
                "priority_score": priority_score,
                "priority_level": self._get_priority_level(priority_score)
            })
        
        # Sort by priority score (descending)
        prioritized.sort(key=lambda x: x["priority_score"], reverse=True)
        
        return prioritized
    
    def _get_priority_level(self, score: int) -> str:
        """Convert a priority score to a priority level.
        
        Args:
            score: Priority score
            
        Returns:
            Priority level string
        """
        if score >= 7:
            return "critical"
        elif score >= 5:
            return "high"
        elif score >= 3:
            return "medium"
        else:
            return "low"
```
--------------------------------------------------------------------------------
/analyzer/visualization/engine.py:
--------------------------------------------------------------------------------
```python
"""
Visualization engine for JMeter test results.
This module provides functionality for creating visual representations
of JMeter test results analysis, including time series graphs, distribution
graphs, endpoint comparison charts, and visualization output formats.
"""
import base64
import io
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# Note: In a real implementation, we would use matplotlib for visualization.
# However, for the purpose of this implementation, we'll create a simplified version
# that doesn't rely on external libraries.
class VisualizationEngine:
    """Engine for creating visual representations of test results analysis."""
    
    def __init__(self, output_dir: Optional[str] = None):
        """Initialize the visualization engine.
        
        Args:
            output_dir: Directory to save visualization files (default: None)
        """
        self.output_dir = output_dir
        if output_dir:
            os.makedirs(output_dir, exist_ok=True)
    
    def create_time_series_graph(self, time_series_metrics: List,
                                metric_name: str = "average_response_time",
                                title: Optional[str] = None,
                                output_file: Optional[str] = None) -> Union[str, Dict]:
        """Create a time-series graph showing performance over the test duration.
        
        Args:
            time_series_metrics: List of TimeSeriesMetrics objects
            metric_name: Name of the metric to plot (default: "average_response_time")
            title: Graph title (default: None)
            output_file: Path to save the graph (default: None)
            
        Returns:
            Path to the saved graph file or Figure object
        """
        if not time_series_metrics:
            raise ValueError("No time series metrics provided")
        
        # Extract data
        timestamps = [metrics.timestamp for metrics in time_series_metrics]
        
        if metric_name == "average_response_time":
            values = [metrics.average_response_time for metrics in time_series_metrics]
            y_label = "Response Time (ms)"
            graph_title = title or "Response Time Over Time"
        elif metric_name == "throughput":
            values = [metrics.throughput for metrics in time_series_metrics]
            y_label = "Throughput (requests/second)"
            graph_title = title or "Throughput Over Time"
        elif metric_name == "error_rate":
            values = [metrics.error_rate for metrics in time_series_metrics]
            y_label = "Error Rate (%)"
            graph_title = title or "Error Rate Over Time"
        elif metric_name == "active_threads":
            values = [metrics.active_threads for metrics in time_series_metrics]
            y_label = "Active Threads"
            graph_title = title or "Active Threads Over Time"
        else:
            raise ValueError(f"Unknown metric name: {metric_name}")
        
        # Create a simple representation of the graph
        graph = {
            "type": "time_series",
            "title": graph_title,
            "x_label": "Time",
            "y_label": y_label,
            "timestamps": [ts.isoformat() for ts in timestamps],
            "values": values
        }
        
        # Save or return
        if output_file:
            output_path = self._get_output_path(output_file)
            with open(output_path, 'w') as f:
                f.write(f"Time Series Graph: {graph_title}\n")
                f.write(f"X-axis: Time\n")
                f.write(f"Y-axis: {y_label}\n")
                f.write("Data:\n")
                for ts, val in zip(timestamps, values):
                    f.write(f"{ts.isoformat()}: {val}\n")
            return output_path
        else:
            return graph
    
    def create_distribution_graph(self, response_times: List[float],
                                percentiles: List[int] = [50, 90, 95, 99],
                                title: Optional[str] = None,
                                output_file: Optional[str] = None) -> Union[str, Dict]:
        """Create a distribution graph showing response time distributions.
        
        Args:
            response_times: List of response times
            percentiles: List of percentiles to mark (default: [50, 90, 95, 99])
            title: Graph title (default: None)
            output_file: Path to save the graph (default: None)
            
        Returns:
            Path to the saved graph file or Figure object
        """
        if not response_times:
            raise ValueError("No response times provided")
        
        # Calculate percentile values
        percentile_values = {}
        for p in percentiles:
            percentile_values[p] = self._calculate_percentile(response_times, p)
        
        # Create a simple representation of the graph
        graph_title = title or "Response Time Distribution"
        graph = {
            "type": "distribution",
            "title": graph_title,
            "x_label": "Response Time (ms)",
            "y_label": "Frequency",
            "response_times": response_times,
            "percentiles": percentile_values
        }
        
        # Save or return
        if output_file:
            output_path = self._get_output_path(output_file)
            with open(output_path, 'w') as f:
                f.write(f"Distribution Graph: {graph_title}\n")
                f.write(f"X-axis: Response Time (ms)\n")
                f.write(f"Y-axis: Frequency\n")
                f.write("Percentiles:\n")
                for p, val in percentile_values.items():
                    f.write(f"{p}th Percentile: {val:.2f} ms\n")
            return output_path
        else:
            return graph
    
    def create_endpoint_comparison_chart(self, endpoint_metrics: Dict,
                                        metric_name: str = "average_response_time",
                                        top_n: int = 10,
                                        title: Optional[str] = None,
                                        output_file: Optional[str] = None) -> Union[str, Dict]:
        """Create a comparison chart for different endpoints.
        
        Args:
            endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects
            metric_name: Name of the metric to compare (default: "average_response_time")
            top_n: Number of top endpoints to include (default: 10)
            title: Chart title (default: None)
            output_file: Path to save the chart (default: None)
            
        Returns:
            Path to the saved chart file or Figure object
        """
        if not endpoint_metrics:
            raise ValueError("No endpoint metrics provided")
        
        # Extract data
        if metric_name == "average_response_time":
            values = {endpoint: metrics.average_response_time for endpoint, metrics in endpoint_metrics.items()}
            y_label = "Average Response Time (ms)"
            chart_title = title or "Endpoint Response Time Comparison"
        elif metric_name == "error_rate":
            values = {endpoint: metrics.error_rate for endpoint, metrics in endpoint_metrics.items()}
            y_label = "Error Rate (%)"
            chart_title = title or "Endpoint Error Rate Comparison"
        elif metric_name == "throughput":
            values = {endpoint: metrics.throughput for endpoint, metrics in endpoint_metrics.items()}
            y_label = "Throughput (requests/second)"
            chart_title = title or "Endpoint Throughput Comparison"
        else:
            raise ValueError(f"Unknown metric name: {metric_name}")
        
        # Sort endpoints by value (descending) and take top N
        sorted_endpoints = sorted(values.items(), key=lambda x: x[1], reverse=True)[:top_n]
        endpoints = [item[0] for item in sorted_endpoints]
        values_list = [item[1] for item in sorted_endpoints]
        
        # Create a simple representation of the chart
        chart = {
            "type": "endpoint_comparison",
            "title": chart_title,
            "x_label": y_label,
            "y_label": "Endpoint",
            "endpoints": endpoints,
            "values": values_list
        }
        
        # Save or return
        if output_file:
            output_path = self._get_output_path(output_file)
            with open(output_path, 'w') as f:
                f.write(f"Endpoint Comparison Chart: {chart_title}\n")
                f.write(f"X-axis: {y_label}\n")
                f.write(f"Y-axis: Endpoint\n")
                f.write("Data:\n")
                for endpoint, value in zip(endpoints, values_list):
                    f.write(f"{endpoint}: {value:.2f}\n")
            return output_path
        else:
            return chart
    
    def create_html_report(self, analysis_results: Dict, output_file: str) -> str:
        """Create an HTML report from analysis results.
        
        Args:
            analysis_results: Dictionary containing analysis results
            output_file: Path to save the HTML report
            
        Returns:
            Path to the saved HTML report
        """
        # Extract data
        summary = analysis_results.get("summary", {})
        detailed = analysis_results.get("detailed", {})
        
        # Create HTML content
        html_content = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>JMeter Test Results Analysis</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                h1, h2, h3 {{ color: #333; }}
                table {{ border-collapse: collapse; width: 100%; margin-bottom: 20px; }}
                th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                th {{ background-color: #f2f2f2; }}
                tr:nth-child(even) {{ background-color: #f9f9f9; }}
                .chart {{ margin: 20px 0; max-width: 100%; }}
                .section {{ margin-bottom: 30px; }}
                .severity-high {{ color: #d9534f; }}
                .severity-medium {{ color: #f0ad4e; }}
                .severity-low {{ color: #5bc0de; }}
            </style>
        </head>
        <body>
            <h1>JMeter Test Results Analysis</h1>
            
            <div class="section">
                <h2>Summary</h2>
                <table>
                    <tr><th>Metric</th><th>Value</th></tr>
                    <tr><td>Total Samples</td><td>{summary.get('total_samples', 'N/A')}</td></tr>
                    <tr><td>Error Count</td><td>{summary.get('error_count', 'N/A')}</td></tr>
                    <tr><td>Error Rate</td><td>{summary.get('error_rate', 'N/A'):.2f}%</td></tr>
                    <tr><td>Average Response Time</td><td>{summary.get('average_response_time', 'N/A'):.2f} ms</td></tr>
                    <tr><td>Median Response Time</td><td>{summary.get('median_response_time', 'N/A'):.2f} ms</td></tr>
                    <tr><td>90th Percentile</td><td>{summary.get('percentile_90', 'N/A'):.2f} ms</td></tr>
                    <tr><td>95th Percentile</td><td>{summary.get('percentile_95', 'N/A'):.2f} ms</td></tr>
                    <tr><td>99th Percentile</td><td>{summary.get('percentile_99', 'N/A'):.2f} ms</td></tr>
                    <tr><td>Min Response Time</td><td>{summary.get('min_response_time', 'N/A'):.2f} ms</td></tr>
                    <tr><td>Max Response Time</td><td>{summary.get('max_response_time', 'N/A'):.2f} ms</td></tr>
                    <tr><td>Throughput</td><td>{summary.get('throughput', 'N/A'):.2f} requests/second</td></tr>
                    <tr><td>Start Time</td><td>{summary.get('start_time', 'N/A')}</td></tr>
                    <tr><td>End Time</td><td>{summary.get('end_time', 'N/A')}</td></tr>
                    <tr><td>Duration</td><td>{summary.get('duration', 'N/A'):.2f} seconds</td></tr>
                </table>
            </div>
        """
        
        # Add detailed information if available
        if detailed:
            # Add endpoint information
            endpoints = detailed.get("endpoints", {})
            if endpoints:
                html_content += """
                <div class="section">
                    <h2>Endpoint Analysis</h2>
                    <table>
                        <tr>
                            <th>Endpoint</th>
                            <th>Samples</th>
                            <th>Errors</th>
                            <th>Error Rate</th>
                            <th>Avg Response Time</th>
                            <th>95th Percentile</th>
                            <th>Throughput</th>
                        </tr>
                """
                
                for endpoint, metrics in endpoints.items():
                    html_content += f"""
                        <tr>
                            <td>{endpoint}</td>
                            <td>{metrics['total_samples']}</td>
                            <td>{metrics['error_count']}</td>
                            <td>{metrics['error_rate']:.2f}%</td>
                            <td>{metrics['average_response_time']:.2f} ms</td>
                            <td>{metrics['percentile_95']:.2f} ms</td>
                            <td>{metrics['throughput']:.2f} req/s</td>
                        </tr>
                    """
                
                html_content += """
                    </table>
                </div>
                """
            
            # Add bottleneck information
            bottlenecks = detailed.get("bottlenecks", {})
            if bottlenecks:
                html_content += """
                <div class="section">
                    <h2>Bottleneck Analysis</h2>
                """
                
                # Slow endpoints
                slow_endpoints = bottlenecks.get("slow_endpoints", [])
                if slow_endpoints:
                    html_content += """
                    <h3>Slow Endpoints</h3>
                    <table>
                        <tr>
                            <th>Endpoint</th>
                            <th>Response Time</th>
                            <th>Threshold</th>
                            <th>Severity</th>
                        </tr>
                    """
                    
                    for endpoint in slow_endpoints:
                        severity_class = f"severity-{endpoint.get('severity', 'medium')}"
                        html_content += f"""
                            <tr>
                                <td>{endpoint.get('endpoint')}</td>
                                <td>{endpoint.get('response_time', 'N/A'):.2f} ms</td>
                                <td>{endpoint.get('threshold', 'N/A'):.2f} ms</td>
                                <td class="{severity_class}">{endpoint.get('severity', 'N/A').upper()}</td>
                            </tr>
                        """
                    
                    html_content += """
                    </table>
                    """
                
                # Error-prone endpoints
                error_endpoints = bottlenecks.get("error_prone_endpoints", [])
                if error_endpoints:
                    html_content += """
                    <h3>Error-Prone Endpoints</h3>
                    <table>
                        <tr>
                            <th>Endpoint</th>
                            <th>Error Rate</th>
                            <th>Threshold</th>
                            <th>Severity</th>
                        </tr>
                    """
                    
                    for endpoint in error_endpoints:
                        severity_class = f"severity-{endpoint.get('severity', 'medium')}"
                        html_content += f"""
                            <tr>
                                <td>{endpoint.get('endpoint')}</td>
                                <td>{endpoint.get('error_rate', 'N/A'):.2f}%</td>
                                <td>{endpoint.get('threshold', 'N/A'):.2f}%</td>
                                <td class="{severity_class}">{endpoint.get('severity', 'N/A').upper()}</td>
                            </tr>
                        """
                    
                    html_content += """
                    </table>
                    """
                
                html_content += """
                </div>
                """
            
            # Add insights and recommendations
            insights = detailed.get("insights", {})
            if insights:
                html_content += """
                <div class="section">
                    <h2>Insights and Recommendations</h2>
                """
                
                # Recommendations
                recommendations = insights.get("recommendations", [])
                if recommendations:
                    html_content += """
                    <h3>Recommendations</h3>
                    <table>
                        <tr>
                            <th>Priority</th>
                            <th>Issue</th>
                            <th>Recommendation</th>
                            <th>Expected Impact</th>
                        </tr>
                    """
                    
                    for rec in recommendations:
                        priority_level = rec.get('priority_level', 'medium')
                        severity_class = f"severity-{priority_level}"
                        html_content += f"""
                            <tr>
                                <td class="{severity_class}">{priority_level.upper()}</td>
                                <td>{rec.get('issue')}</td>
                                <td>{rec.get('recommendation')}</td>
                                <td>{rec.get('expected_impact')}</td>
                            </tr>
                        """
                    
                    html_content += """
                    </table>
                    """
                
                # Scaling insights
                scaling_insights = insights.get("scaling_insights", [])
                if scaling_insights:
                    html_content += """
                    <h3>Scaling Insights</h3>
                    <table>
                        <tr>
                            <th>Topic</th>
                            <th>Description</th>
                        </tr>
                    """
                    
                    for insight in scaling_insights:
                        html_content += f"""
                            <tr>
                                <td>{insight.get('topic')}</td>
                                <td>{insight.get('description')}</td>
                            </tr>
                        """
                    
                    html_content += """
                    </table>
                    """
                
                html_content += """
                </div>
                """
        
        # Close HTML
        html_content += """
        </body>
        </html>
        """
        
        # Save HTML report
        output_path = self._get_output_path(output_file)
        with open(output_path, 'w') as f:
            f.write(html_content)
        
        return output_path
    
    def figure_to_base64(self, fig) -> str:
        """Convert a figure to a base64-encoded string.
        
        Args:
            fig: Figure object
            
        Returns:
            Base64-encoded string
        """
        # In a real implementation, this would convert a matplotlib figure to base64
        # For this simplified version, we'll just return a placeholder
        return "base64_encoded_image_placeholder"
    
    def _get_output_path(self, output_file: str) -> str:
        """Get the full path for an output file.
        
        Args:
            output_file: Output file name or path
            
        Returns:
            Full path to the output file
        """
        if self.output_dir:
            return os.path.join(self.output_dir, output_file)
        else:
            return output_file
    
    def _calculate_percentile(self, values: List[float], percentile: float) -> float:
        """Calculate a percentile from values.
        
        Args:
            values: List of values
            percentile: Percentile to calculate (0-100)
            
        Returns:
            Percentile value
        """
        if not values:
            return 0
        
        # Sort values
        sorted_values = sorted(values)
        
        # Calculate index
        index = (percentile / 100) * (len(sorted_values) - 1)
        
        # If index is an integer, return the value at that index
        if index.is_integer():
            return sorted_values[int(index)]
        
        # Otherwise, interpolate between the two nearest values
        lower_index = int(index)
        upper_index = lower_index + 1
        lower_value = sorted_values[lower_index]
        upper_value = sorted_values[upper_index]
        fraction = index - lower_index
        
        return lower_value + (upper_value - lower_value) * fraction
```
--------------------------------------------------------------------------------
/jmeter_server.py:
--------------------------------------------------------------------------------
```python
from typing import Any
import subprocess
from pathlib import Path
from mcp.server.fastmcp import FastMCP
import os
import datetime
import uuid
import logging
import logging
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize MCP server
mcp = FastMCP("jmeter")
async def run_jmeter(test_file: str, non_gui: bool = True, properties: dict = None, generate_report: bool = False, report_output_dir: str = None, log_file: str = None) -> str:
    """Run a JMeter test.
    Args:
        test_file: Path to the JMeter test file (.jmx)
        non_gui: Run in non-GUI mode (default: True)
        properties: Dictionary of JMeter properties to pass with -J (default: None)
        generate_report: Whether to generate report dashboard after load test (default: False)
        report_output_dir: Output folder for report dashboard (default: None)
        log_file: Name of JTL file to log sample results to (default: None)
    Returns:
        str: JMeter execution output
    """
    try:
        # Convert to absolute path
        test_file_path = Path(test_file).resolve()
        
        # Validate file exists and is a .jmx file
        if not test_file_path.exists():
            return f"Error: Test file not found: {test_file}"
        if not test_file_path.suffix == '.jmx':
            return f"Error: Invalid file type. Expected .jmx file: {test_file}"
        # Get JMeter binary path from environment
        jmeter_bin = os.getenv('JMETER_BIN', 'jmeter')
        java_opts = os.getenv('JMETER_JAVA_OPTS', '')
        # Log the JMeter binary path and Java options
        logger.info(f"JMeter binary path: {jmeter_bin}")
        logger.debug(f"Java options: {java_opts}")
        # Build command
        cmd = [str(Path(jmeter_bin).resolve())]
        
        if non_gui:
            cmd.extend(['-n'])
        cmd.extend(['-t', str(test_file_path)])
        
        # Add JMeter properties if provided∑
        if properties:
            for prop_name, prop_value in properties.items():
                cmd.extend([f'-J{prop_name}={prop_value}'])
                logger.debug(f"Adding property: -J{prop_name}={prop_value}")
        
        # Add report generation options if requested
        if generate_report and non_gui:
            if log_file is None:
                # Generate unique log file name if not specified
                unique_id = generate_unique_id()
                log_file = f"{test_file_path.stem}_{unique_id}_results.jtl"
                logger.debug(f"Using generated unique log file: {log_file}")
            
            cmd.extend(['-l', log_file])
            cmd.extend(['-e'])
            
            # Always ensure report_output_dir is unique
            unique_id = unique_id if 'unique_id' in locals() else generate_unique_id()
            
            if report_output_dir:
                # Append unique identifier to user-provided report directory
                original_dir = report_output_dir
                report_output_dir = f"{original_dir}_{unique_id}"
                logger.debug(f"Making user-provided report directory unique: {original_dir} -> {report_output_dir}")
            else:
                # Generate unique report output directory if not specified
                report_output_dir = f"{test_file_path.stem}_{unique_id}_report"
                logger.debug(f"Using generated unique report output directory: {report_output_dir}")
                
            cmd.extend(['-o', report_output_dir])
        # Log the full command for debugging
        logger.debug(f"Executing command: {' '.join(cmd)}")
        
        if non_gui:
            # For non-GUI mode, capture output
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            # Log output for debugging
            logger.debug("Command output:")
            logger.debug(f"Return code: {result.returncode}")
            logger.debug(f"Stdout: {result.stdout}")
            logger.debug(f"Stderr: {result.stderr}")
            if result.returncode != 0:
                return f"Error executing JMeter test:\n{result.stderr}"
            
            return result.stdout
        else:
            # For GUI mode, start process without capturing output
            subprocess.Popen(cmd)
            return "JMeter GUI launched successfully"
    except Exception as e:
        return f"Unexpected error: {str(e)}"
@mcp.tool()
async def execute_jmeter_test(test_file: str, gui_mode: bool = False, properties: dict = None) -> str:
    """Execute a JMeter test.
    Args:
        test_file: Path to the JMeter test file (.jmx)
        gui_mode: Whether to run in GUI mode (default: False)
        properties: Dictionary of JMeter properties to pass with -J (default: None)
    """
    return await run_jmeter(test_file, non_gui=not gui_mode, properties=properties)  # Run in non-GUI mode by default
@mcp.tool()
async def execute_jmeter_test_non_gui(test_file: str, properties: dict = None, generate_report: bool = False, report_output_dir: str = None, log_file: str = None) -> str:
    """Execute a JMeter test in non-GUI mode - supports JMeter properties.
    Args:
        test_file: Path to the JMeter test file (.jmx)
        properties: Dictionary of JMeter properties to pass with -J (default: None)
        generate_report: Whether to generate report dashboard after load test (default: False)
        report_output_dir: Output folder for report dashboard (default: None)
        log_file: Name of JTL file to log sample results to (default: None)
    """
    return await run_jmeter(test_file, non_gui=True, properties=properties, generate_report=generate_report, report_output_dir=report_output_dir, log_file=log_file)
# Import the analyzer module
from analyzer.models import TestResults
from analyzer.analyzer import TestResultsAnalyzer
from analyzer.visualization.engine import VisualizationEngine
@mcp.tool()
async def analyze_jmeter_results(jtl_file: str, detailed: bool = False) -> str:
    """Analyze JMeter test results and provide a summary of key metrics and insights.
    
    Args:
        jtl_file: Path to the JTL file containing test results
        detailed: Whether to include detailed analysis (default: False)
        
    Returns:
        str: Analysis results in a formatted string
    """
    try:
        analyzer = TestResultsAnalyzer()
        
        # Validate file exists
        file_path = Path(jtl_file)
        if not file_path.exists():
            return f"Error: JTL file not found: {jtl_file}"
        
        try:
            # Analyze the file
            analysis_results = analyzer.analyze_file(file_path, detailed=detailed)
            
            # Format the results as a string
            result_str = f"Analysis of {jtl_file}:\n\n"
            
            # Add summary information
            summary = analysis_results.get("summary", {})
            result_str += "Summary:\n"
            result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n"
            result_str += f"- Error count: {summary.get('error_count', 'N/A')} ({summary.get('error_rate', 'N/A'):.2f}%)\n"
            result_str += f"- Response times (ms):\n"
            result_str += f"  - Average: {summary.get('average_response_time', 'N/A'):.2f}\n"
            result_str += f"  - Median: {summary.get('median_response_time', 'N/A'):.2f}\n"
            result_str += f"  - 90th percentile: {summary.get('percentile_90', 'N/A'):.2f}\n"
            result_str += f"  - 95th percentile: {summary.get('percentile_95', 'N/A'):.2f}\n"
            result_str += f"  - 99th percentile: {summary.get('percentile_99', 'N/A'):.2f}\n"
            result_str += f"  - Min: {summary.get('min_response_time', 'N/A'):.2f}\n"
            result_str += f"  - Max: {summary.get('max_response_time', 'N/A'):.2f}\n"
            result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n"
            result_str += f"- Start time: {summary.get('start_time', 'N/A')}\n"
            result_str += f"- End time: {summary.get('end_time', 'N/A')}\n"
            result_str += f"- Duration: {summary.get('duration', 'N/A'):.2f} seconds\n\n"
            
            # Add detailed information if requested
            if detailed and "detailed" in analysis_results:
                detailed_info = analysis_results["detailed"]
                
                # Add endpoint information
                endpoints = detailed_info.get("endpoints", {})
                if endpoints:
                    result_str += "Endpoint Analysis:\n"
                    for endpoint, metrics in endpoints.items():
                        result_str += f"- {endpoint}:\n"
                        result_str += f"  - Samples: {metrics.get('total_samples', 'N/A')}\n"
                        result_str += f"  - Errors: {metrics.get('error_count', 'N/A')} ({metrics.get('error_rate', 'N/A'):.2f}%)\n"
                        result_str += f"  - Average response time: {metrics.get('average_response_time', 'N/A'):.2f} ms\n"
                        result_str += f"  - 95th percentile: {metrics.get('percentile_95', 'N/A'):.2f} ms\n"
                        result_str += f"  - Throughput: {metrics.get('throughput', 'N/A'):.2f} requests/second\n"
                    result_str += "\n"
                
                # Add bottleneck information
                bottlenecks = detailed_info.get("bottlenecks", {})
                if bottlenecks:
                    result_str += "Bottleneck Analysis:\n"
                    
                    # Slow endpoints
                    slow_endpoints = bottlenecks.get("slow_endpoints", [])
                    if slow_endpoints:
                        result_str += "- Slow Endpoints:\n"
                        for endpoint in slow_endpoints:
                            result_str += f"  - {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms "
                            result_str += f"(Severity: {endpoint.get('severity')})\n"
                        result_str += "\n"
                    
                    # Error-prone endpoints
                    error_endpoints = bottlenecks.get("error_prone_endpoints", [])
                    if error_endpoints:
                        result_str += "- Error-Prone Endpoints:\n"
                        for endpoint in error_endpoints:
                            result_str += f"  - {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% "
                            result_str += f"(Severity: {endpoint.get('severity')})\n"
                        result_str += "\n"
                    
                    # Anomalies
                    anomalies = bottlenecks.get("anomalies", [])
                    if anomalies:
                        result_str += "- Response Time Anomalies:\n"
                        for anomaly in anomalies[:3]:  # Show only top 3 anomalies
                            result_str += f"  - At {anomaly.get('timestamp')}: "
                            result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, "
                            result_str += f"Got {anomaly.get('actual_value'):.2f} ms "
                            result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n"
                        result_str += "\n"
                    
                    # Concurrency impact
                    concurrency = bottlenecks.get("concurrency_impact", {})
                    if concurrency:
                        result_str += "- Concurrency Impact:\n"
                        correlation = concurrency.get("correlation", 0)
                        result_str += f"  - Correlation between threads and response time: {correlation:.2f}\n"
                        
                        if concurrency.get("has_degradation", False):
                            result_str += f"  - Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n"
                        else:
                            result_str += "  - No significant performance degradation detected with increasing threads\n"
                        result_str += "\n"
                
                # Add insights and recommendations
                insights = detailed_info.get("insights", {})
                if insights:
                    result_str += "Insights and Recommendations:\n"
                    
                    # Recommendations
                    recommendations = insights.get("recommendations", [])
                    if recommendations:
                        result_str += "- Top Recommendations:\n"
                        for rec in recommendations[:3]:  # Show only top 3 recommendations
                            result_str += f"  - [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n"
                            result_str += f"    Recommendation: {rec.get('recommendation')}\n"
                            result_str += f"    Expected Impact: {rec.get('expected_impact')}\n"
                        result_str += "\n"
                    
                    # Scaling insights
                    scaling_insights = insights.get("scaling_insights", [])
                    if scaling_insights:
                        result_str += "- Scaling Insights:\n"
                        for insight in scaling_insights[:2]:  # Show only top 2 insights
                            result_str += f"  - {insight.get('topic')}: {insight.get('description')}\n"
                        result_str += "\n"
                
                # Add time series information (just a summary)
                time_series = detailed_info.get("time_series", [])
                if time_series:
                    result_str += "Time Series Analysis:\n"
                    result_str += f"- Intervals: {len(time_series)}\n"
                    result_str += f"- Interval duration: 5 seconds\n"
                    
                    # Calculate average throughput and response time over intervals
                    avg_throughput = sum(ts.get('throughput', 0) for ts in time_series) / len(time_series)
                    avg_response_time = sum(ts.get('average_response_time', 0) for ts in time_series) / len(time_series)
                    
                    result_str += f"- Average throughput over intervals: {avg_throughput:.2f} requests/second\n"
                    result_str += f"- Average response time over intervals: {avg_response_time:.2f} ms\n\n"
            
            return result_str
            
        except ValueError as e:
            return f"Error analyzing JTL file: {str(e)}"
        
    except Exception as e:
        return f"Error analyzing JMeter results: {str(e)}"
@mcp.tool()
async def identify_performance_bottlenecks(jtl_file: str) -> str:
    """Identify performance bottlenecks in JMeter test results.
    
    Args:
        jtl_file: Path to the JTL file containing test results
        
    Returns:
        str: Bottleneck analysis results in a formatted string
    """
    try:
        analyzer = TestResultsAnalyzer()
        
        # Validate file exists
        file_path = Path(jtl_file)
        if not file_path.exists():
            return f"Error: JTL file not found: {jtl_file}"
        
        try:
            # Analyze the file with detailed analysis
            analysis_results = analyzer.analyze_file(file_path, detailed=True)
            
            # Format the results as a string
            result_str = f"Performance Bottleneck Analysis of {jtl_file}:\n\n"
            
            # Add bottleneck information
            detailed_info = analysis_results.get("detailed", {})
            bottlenecks = detailed_info.get("bottlenecks", {})
            
            if not bottlenecks:
                return f"No bottlenecks identified in {jtl_file}."
            
            # Slow endpoints
            slow_endpoints = bottlenecks.get("slow_endpoints", [])
            if slow_endpoints:
                result_str += "Slow Endpoints:\n"
                for endpoint in slow_endpoints:
                    result_str += f"- {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms "
                    result_str += f"(Severity: {endpoint.get('severity')})\n"
                result_str += "\n"
            else:
                result_str += "No slow endpoints identified.\n\n"
            
            # Error-prone endpoints
            error_endpoints = bottlenecks.get("error_prone_endpoints", [])
            if error_endpoints:
                result_str += "Error-Prone Endpoints:\n"
                for endpoint in error_endpoints:
                    result_str += f"- {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% "
                    result_str += f"(Severity: {endpoint.get('severity')})\n"
                result_str += "\n"
            else:
                result_str += "No error-prone endpoints identified.\n\n"
            
            # Anomalies
            anomalies = bottlenecks.get("anomalies", [])
            if anomalies:
                result_str += "Response Time Anomalies:\n"
                for anomaly in anomalies:
                    result_str += f"- At {anomaly.get('timestamp')}: "
                    result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, "
                    result_str += f"Got {anomaly.get('actual_value'):.2f} ms "
                    result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n"
                result_str += "\n"
            else:
                result_str += "No response time anomalies detected.\n\n"
            
            # Concurrency impact
            concurrency = bottlenecks.get("concurrency_impact", {})
            if concurrency:
                result_str += "Concurrency Impact:\n"
                correlation = concurrency.get("correlation", 0)
                result_str += f"- Correlation between threads and response time: {correlation:.2f}\n"
                
                if concurrency.get("has_degradation", False):
                    result_str += f"- Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n"
                else:
                    result_str += "- No significant performance degradation detected with increasing threads\n"
                result_str += "\n"
            
            # Add recommendations
            insights = detailed_info.get("insights", {})
            recommendations = insights.get("recommendations", [])
            
            if recommendations:
                result_str += "Recommendations:\n"
                for rec in recommendations[:5]:  # Show top 5 recommendations
                    result_str += f"- [{rec.get('priority_level', 'medium').upper()}] {rec.get('recommendation')}\n"
            else:
                result_str += "No specific recommendations available.\n"
            
            return result_str
            
        except ValueError as e:
            return f"Error analyzing JTL file: {str(e)}"
        
    except Exception as e:
        return f"Error identifying performance bottlenecks: {str(e)}"
@mcp.tool()
async def get_performance_insights(jtl_file: str) -> str:
    """Get insights and recommendations for improving performance based on JMeter test results.
    
    Args:
        jtl_file: Path to the JTL file containing test results
        
    Returns:
        str: Performance insights and recommendations in a formatted string
    """
    try:
        analyzer = TestResultsAnalyzer()
        
        # Validate file exists
        file_path = Path(jtl_file)
        if not file_path.exists():
            return f"Error: JTL file not found: {jtl_file}"
        
        try:
            # Analyze the file with detailed analysis
            analysis_results = analyzer.analyze_file(file_path, detailed=True)
            
            # Format the results as a string
            result_str = f"Performance Insights for {jtl_file}:\n\n"
            
            # Add insights information
            detailed_info = analysis_results.get("detailed", {})
            insights = detailed_info.get("insights", {})
            
            if not insights:
                return f"No insights available for {jtl_file}."
            
            # Recommendations
            recommendations = insights.get("recommendations", [])
            if recommendations:
                result_str += "Recommendations:\n"
                for i, rec in enumerate(recommendations[:5], 1):  # Show top 5 recommendations
                    result_str += f"{i}. [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n"
                    result_str += f"   - Recommendation: {rec.get('recommendation')}\n"
                    result_str += f"   - Expected Impact: {rec.get('expected_impact')}\n"
                    result_str += f"   - Implementation Difficulty: {rec.get('implementation_difficulty')}\n\n"
            else:
                result_str += "No specific recommendations available.\n\n"
            
            # Scaling insights
            scaling_insights = insights.get("scaling_insights", [])
            if scaling_insights:
                result_str += "Scaling Insights:\n"
                for i, insight in enumerate(scaling_insights, 1):
                    result_str += f"{i}. {insight.get('topic')}\n"
                    result_str += f"   {insight.get('description')}\n\n"
            else:
                result_str += "No scaling insights available.\n\n"
            
            # Add summary metrics for context
            summary = analysis_results.get("summary", {})
            result_str += "Test Summary:\n"
            result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n"
            result_str += f"- Error rate: {summary.get('error_rate', 'N/A'):.2f}%\n"
            result_str += f"- Average response time: {summary.get('average_response_time', 'N/A'):.2f} ms\n"
            result_str += f"- 95th percentile: {summary.get('percentile_95', 'N/A'):.2f} ms\n"
            result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n"
            
            return result_str
            
        except ValueError as e:
            return f"Error analyzing JTL file: {str(e)}"
        
    except Exception as e:
        return f"Error getting performance insights: {str(e)}"
@mcp.tool()
async def generate_visualization(jtl_file: str, visualization_type: str, output_file: str) -> str:
    """Generate visualizations of JMeter test results.
    
    Args:
        jtl_file: Path to the JTL file containing test results
        visualization_type: Type of visualization to generate (time_series, distribution, comparison, html_report)
        output_file: Path to save the visualization
        
    Returns:
        str: Path to the generated visualization file
    """
    try:
        analyzer = TestResultsAnalyzer()
        
        # Validate file exists
        file_path = Path(jtl_file)
        if not file_path.exists():
            return f"Error: JTL file not found: {jtl_file}"
        
        try:
            # Analyze the file with detailed analysis
            analysis_results = analyzer.analyze_file(file_path, detailed=True)
            
            # Create visualization engine
            output_dir = os.path.dirname(output_file) if output_file else None
            engine = VisualizationEngine(output_dir=output_dir)
            
            # Generate visualization based on type
            if visualization_type == "time_series":
                # Extract time series metrics
                time_series = analysis_results.get("detailed", {}).get("time_series", [])
                if not time_series:
                    return "No time series data available for visualization."
                
                # Convert to TimeSeriesMetrics objects
                metrics = []
                for ts_data in time_series:
                    metrics.append(TimeSeriesMetrics(
                        timestamp=datetime.datetime.fromisoformat(ts_data["timestamp"]),
                        active_threads=ts_data["active_threads"],
                        throughput=ts_data["throughput"],
                        average_response_time=ts_data["average_response_time"],
                        error_rate=ts_data["error_rate"]
                    ))
                
                # Create visualization
                output_path = engine.create_time_series_graph(
                    metrics, metric_name="average_response_time", output_file=output_file)
                return f"Time series graph generated: {output_path}"
                
            elif visualization_type == "distribution":
                # Extract response times
                samples = []
                for endpoint, metrics in analysis_results.get("detailed", {}).get("endpoints", {}).items():
                    samples.extend([metrics["average_response_time"]] * metrics["total_samples"])
                
                if not samples:
                    return "No response time data available for visualization."
                
                # Create visualization
                output_path = engine.create_distribution_graph(samples, output_file=output_file)
                return f"Distribution graph generated: {output_path}"
                
            elif visualization_type == "comparison":
                # Extract endpoint metrics
                endpoints = analysis_results.get("detailed", {}).get("endpoints", {})
                if not endpoints:
                    return "No endpoint data available for visualization."
                
                # Convert to EndpointMetrics objects
                endpoint_metrics = {}
                for endpoint, metrics_data in endpoints.items():
                    endpoint_metrics[endpoint] = EndpointMetrics(
                        endpoint=endpoint,
                        total_samples=metrics_data["total_samples"],
                        error_count=metrics_data["error_count"],
                        error_rate=metrics_data["error_rate"],
                        average_response_time=metrics_data["average_response_time"],
                        median_response_time=metrics_data["median_response_time"],
                        percentile_90=metrics_data["percentile_90"],
                        percentile_95=metrics_data["percentile_95"],
                        percentile_99=metrics_data["percentile_99"],
                        min_response_time=metrics_data["min_response_time"],
                        max_response_time=metrics_data["max_response_time"],
                        throughput=metrics_data["throughput"],
                        test_duration=analysis_results["summary"]["duration"]
                    )
                
                # Create visualization
                output_path = engine.create_endpoint_comparison_chart(
                    endpoint_metrics, metric_name="average_response_time", output_file=output_file)
                return f"Endpoint comparison chart generated: {output_path}"
                
            elif visualization_type == "html_report":
                # Create HTML report
                output_path = engine.create_html_report(analysis_results, output_file)
                return f"HTML report generated: {output_path}"
                
            else:
                return f"Unknown visualization type: {visualization_type}. " \
                       f"Supported types: time_series, distribution, comparison, html_report"
            
        except ValueError as e:
            return f"Error generating visualization: {str(e)}"
        
    except Exception as e:
        return f"Error generating visualization: {str(e)}"
def generate_unique_id():
    """
    Generate a unique identifier using timestamp and UUID.
    
    Returns:
        str: A unique identifier string
    """
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    random_id = str(uuid.uuid4())[:8]  # Use first 8 chars of UUID for brevity
    return f"{timestamp}_{random_id}"
if __name__ == "__main__":
    mcp.run(transport='stdio')
```