# Directory Structure
```
├── .dockerignore
├── .env.example
├── .gitignore
├── .python-version
├── analyzer
│ ├── __init__.py
│ ├── analyzer.py
│ ├── bottleneck
│ │ ├── __init__.py
│ │ └── analyzer.py
│ ├── insights
│ │ ├── __init__.py
│ │ └── generator.py
│ ├── mcp
│ │ └── __init__.py
│ ├── metrics
│ │ ├── __init__.py
│ │ └── calculator.py
│ ├── models.py
│ ├── parser
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── csv_parser.py
│ │ └── xml_parser.py
│ └── visualization
│ ├── __init__.py
│ └── engine.py
├── Dockerfile
├── images
│ ├── Anthropic-MCP.png
│ ├── Cursor.png
│ └── Windsurf.png
├── jmeter_report.html
├── jmeter_server.py
├── main.py
├── mcp_config.json
├── pyproject.toml
├── README.md
├── requirements_windsurf_reader.txt
├── requirements.txt
├── sample_test.jmx
├── smithery.yaml
├── tests
│ ├── __init__.py
│ ├── test_analyzer_models.py
│ ├── test_analyzer_parser.py
│ ├── test_bottleneck_analyzer.py
│ ├── test_csv_parser.py
│ ├── test_insights_generator.py
│ ├── test_jmeter_server.py
│ ├── test_metrics_calculator.py
│ ├── test_visualization_engine.py
│ └── test_xml_parser.py
├── windsurf_db_reader_alternative.py
└── windsurf_db_reader.py
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
.git
.gitignore
__pycache__
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.env
*.log
.DS_Store
Dockerfile
.dockerignore
README.md
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# JMeter Configuration
JMETER_HOME=/path/to/apache-jmeter-5.6.3
JMETER_BIN=${JMETER_HOME}/bin/jmeter
# Optional: JMeter Java options
JMETER_JAVA_OPTS="-Xms1g -Xmx2g"
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
.DS_Store
.env
*.jtl
*.csv
.kiro/
*.zip
*.chat
.kiro/debug/chats/1.chat
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# 🚀 JMeter MCP Server
This is a Model Context Protocol (MCP) server that allows executing JMeter tests through MCP-compatible clients and analyzing test results.
> [!IMPORTANT]
> 📢 Looking for an AI Assistant inside JMeter? 🚀
> Check out [Feather Wand](https://jmeter.ai)



## 📋 Features
### JMeter Execution
- 📊 Execute JMeter tests in non-GUI mode
- 🖥️ Launch JMeter in GUI mode
- 📝 Capture and return execution output
- 📊 Generate JMeter report dashboard
### Test Results Analysis
- 📈 Parse and analyze JMeter test results (JTL files)
- 📊 Calculate comprehensive performance metrics
- 🔍 Identify performance bottlenecks automatically
- 💡 Generate actionable insights and recommendations
- 📊 Create visualizations of test results
- 📑 Generate HTML reports with analysis results
## 🛠️ Installation
### Local Installation
1. Install [`uv`](https://github.com/astral-sh/uv):
2. Ensure JMeter is installed on your system and accessible via the command line.
⚠️ **Important**: Make sure JMeter is executable. You can do this by running:
```bash
chmod +x /path/to/jmeter/bin/jmeter
```
3. Install required Python dependencies:
```bash
pip install numpy matplotlib
```
4. Configure the `.env` file, refer to the `.env.example` file for details.
```bash
# JMeter Configuration
JMETER_HOME=/path/to/apache-jmeter-5.6.3
JMETER_BIN=${JMETER_HOME}/bin/jmeter
# Optional: JMeter Java options
JMETER_JAVA_OPTS="-Xms1g -Xmx2g"
```
### 💻 MCP Usage
1. Connect to the server using an MCP-compatible client (e.g., Claude Desktop, Cursor, Windsurf)
2. Send a prompt to the server:
```
Run JMeter test /path/to/test.jmx
```
3. MCP compatible client will use the available tools:
#### JMeter Execution Tools
- 🖥️ `execute_jmeter_test`: Launches JMeter in GUI mode, but doesn't execute test as per the JMeter design
- 🚀 `execute_jmeter_test_non_gui`: Execute a JMeter test in non-GUI mode (default mode for better performance)
#### Test Results Analysis Tools
- 📊 `analyze_jmeter_results`: Analyze JMeter test results and provide a summary of key metrics and insights
- 🔍 `identify_performance_bottlenecks`: Identify performance bottlenecks in JMeter test results
- 💡 `get_performance_insights`: Get insights and recommendations for improving performance
- 📈 `generate_visualization`: Generate visualizations of JMeter test results
## 🏗️ MCP Configuration
Add the following configuration to your MCP client config:
```json
{
"mcpServers": {
"jmeter": {
"command": "/path/to/uv",
"args": [
"--directory",
"/path/to/jmeter-mcp-server",
"run",
"jmeter_server.py"
]
}
}
}
```
## ✨ Use Cases
### Test Execution
- Run JMeter tests in non-GUI mode for better performance
- Launch JMeter in GUI mode for test development
- Generate JMeter report dashboards
### Test Results Analysis
- Analyze JTL files to understand performance characteristics
- Identify performance bottlenecks and their severity
- Get actionable recommendations for performance improvements
- Generate visualizations for better understanding of results
- Create comprehensive HTML reports for sharing with stakeholders
## 🛑 Error Handling
The server will:
- Validate that the test file exists
- Check that the file has a .jmx extension
- Validate that JTL files exist and have valid formats
- Capture and return any execution or analysis errors
## 📊 Test Results Analyzer
The Test Results Analyzer is a powerful feature that helps you understand your JMeter test results better. It consists of several components:
### Parser Module
- Supports both XML and CSV JTL formats
- Efficiently processes large files with streaming parsers
- Validates file formats and handles errors gracefully
### Metrics Calculator
- Calculates overall performance metrics (average, median, percentiles)
- Provides endpoint-specific metrics for detailed analysis
- Generates time series metrics to track performance over time
- Compares metrics with benchmarks for context
### Bottleneck Analyzer
- Identifies slow endpoints based on response times
- Detects error-prone endpoints with high error rates
- Finds response time anomalies and outliers
- Analyzes the impact of concurrency on performance
### Insights Generator
- Provides specific recommendations for addressing bottlenecks
- Analyzes error patterns and suggests solutions
- Generates insights on scaling behavior and capacity limits
- Prioritizes recommendations based on potential impact
### Visualization Engine
- Creates time series graphs showing performance over time
- Generates distribution graphs for response time analysis
- Produces endpoint comparison charts for identifying issues
- Creates comprehensive HTML reports with all analysis results
## 📝 Example Usage
```
# Run a JMeter test and generate a results file
Run JMeter test sample_test.jmx in non-GUI mode and save results to results.jtl
# Analyze the results
Analyze the JMeter test results in results.jtl and provide detailed insights
# Identify bottlenecks
What are the performance bottlenecks in the results.jtl file?
# Get recommendations
What recommendations do you have for improving performance based on results.jtl?
# Generate visualizations
Create a time series graph of response times from results.jtl
```
```
--------------------------------------------------------------------------------
/requirements_windsurf_reader.txt:
--------------------------------------------------------------------------------
```
plyvel>=1.3.0
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp[cli]<1.6.0
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Test package initializer for jmeter_server tests.
"""
```
--------------------------------------------------------------------------------
/analyzer/mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""
MCP interface for the JMeter Test Results Analyzer.
This module provides MCP tools for analyzing JMeter test results.
"""
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "jmeter-mcp-server"
version = "0.1.0"
description = "JMeter MCP Server"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.6.0",
]
```
--------------------------------------------------------------------------------
/mcp_config.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"jmeter": {
"command": "/path/to/uv",
"args": [
"--directory",
"/path/to/jmeter-mcp-server",
"run",
"jmeter_server.py"
]
}
}
}
```
--------------------------------------------------------------------------------
/analyzer/metrics/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Metrics module for JMeter test results.
This module provides functionality for calculating performance metrics
from JMeter test results.
"""
from analyzer.metrics.calculator import MetricsCalculator
__all__ = ['MetricsCalculator']
```
--------------------------------------------------------------------------------
/analyzer/__init__.py:
--------------------------------------------------------------------------------
```python
"""
JMeter Test Results Analyzer module.
This module provides functionality for analyzing JMeter test results,
calculating performance metrics, identifying bottlenecks, and generating
insights and recommendations.
"""
__version__ = '0.1.0'
```
--------------------------------------------------------------------------------
/analyzer/bottleneck/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Bottleneck analyzer module for JMeter test results.
This module provides functionality for identifying performance bottlenecks
in JMeter test results.
"""
from analyzer.bottleneck.analyzer import BottleneckAnalyzer
__all__ = ['BottleneckAnalyzer']
```
--------------------------------------------------------------------------------
/analyzer/visualization/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Visualization module for JMeter test results.
This module provides functionality for creating visual representations
of JMeter test results analysis.
"""
from analyzer.visualization.engine import VisualizationEngine
__all__ = ['VisualizationEngine']
```
--------------------------------------------------------------------------------
/analyzer/insights/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Insights module for JMeter test results.
This module provides functionality for generating insights and recommendations
based on JMeter test results analysis.
"""
from analyzer.insights.generator import InsightsGenerator
__all__ = ['InsightsGenerator']
```
--------------------------------------------------------------------------------
/analyzer/parser/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Parser module for JMeter test results.
This module provides functionality for parsing JMeter test results
from JTL files in both XML and CSV formats.
"""
from analyzer.parser.base import JTLParser
from analyzer.parser.xml_parser import XMLJTLParser
from analyzer.parser.csv_parser import CSVJTLParser
__all__ = ['JTLParser', 'XMLJTLParser', 'CSVJTLParser']
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
import os
# Load environment variables
load_dotenv()
# Initialize MCP server
mcp = FastMCP("jmeter")
def main():
print("Starting JMeter MCP server...")
print(os.getenv('JMETER_HOME'))
print(os.getenv('JMETER_BIN'))
print(os.getenv('JMETER_JAVA_OPTS'))
mcp.run(transport='stdio')
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml
startCommand:
type: stdio
configSchema:
# JSON Schema defining the configuration options for the MCP.
type: object
properties: {}
commandFunction:
# A function that produces the CLI command to start the MCP on stdio.
|-
(config) => ({command: 'python', args: [
"jmeter_server.py"
], env: {}})
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Use Python base image
FROM python:3.10-slim
# Install OpenJDK and build dependencies
RUN apt-get update && \
apt-get install -y default-jdk wget && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Install JMeter
ENV JMETER_VERSION="5.6.3"
ENV JMETER_HOME="/opt/apache-jmeter-${JMETER_VERSION}"
ENV PATH="$JMETER_HOME/bin:$PATH"
RUN wget https://archive.apache.org/dist/jmeter/binaries/apache-jmeter-${JMETER_VERSION}.tgz && \
tar -xzf apache-jmeter-${JMETER_VERSION}.tgz -C /opt && \
rm apache-jmeter-${JMETER_VERSION}.tgz
# Set working directory
WORKDIR /app
# Copy application files
COPY . .
# Install Python dependencies
RUN pip install --upgrade pip && \
pip install "mcp[cli]<1.6.0" && \
pip install --no-cache-dir -r requirements.txt
# Expose port (adjust if your server uses a different port)
EXPOSE 8000
# Run the server
CMD ["python", "jmeter_server.py"]
```
--------------------------------------------------------------------------------
/analyzer/models.py:
--------------------------------------------------------------------------------
```python
"""
Data models for the JMeter Test Results Analyzer.
This module defines the core data structures used throughout the analyzer,
including TestResults, Sample, and various metrics classes.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
@dataclass
class Sample:
"""Represents a single sample/request in a JMeter test."""
timestamp: datetime
label: str
response_time: int # in milliseconds
success: bool
response_code: str
error_message: Optional[str] = None
thread_name: Optional[str] = None
bytes_sent: Optional[int] = None
bytes_received: Optional[int] = None
latency: Optional[int] = None # in milliseconds
connect_time: Optional[int] = None # in milliseconds
@dataclass
class TestResults:
"""Represents the results of a JMeter test."""
samples: List[Sample] = field(default_factory=list)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
def add_sample(self, sample: Sample) -> None:
"""Add a sample to the test results."""
self.samples.append(sample)
# Update start and end times
if self.start_time is None or sample.timestamp < self.start_time:
self.start_time = sample.timestamp
if self.end_time is None or sample.timestamp > self.end_time:
self.end_time = sample.timestamp
@dataclass
class OverallMetrics:
"""Represents overall metrics for a test or endpoint."""
total_samples: int = 0
error_count: int = 0
error_rate: float = 0.0
average_response_time: float = 0.0
median_response_time: float = 0.0
percentile_90: float = 0.0
percentile_95: float = 0.0
percentile_99: float = 0.0
min_response_time: float = 0.0
max_response_time: float = 0.0
throughput: float = 0.0 # requests per second
test_duration: float = 0.0 # in seconds
@dataclass
class EndpointMetrics(OverallMetrics):
"""Represents metrics for a specific endpoint/sampler."""
endpoint: str = ""
@dataclass
class TimeSeriesMetrics:
"""Represents metrics for a specific time interval."""
timestamp: datetime
active_threads: int = 0
throughput: float = 0.0
average_response_time: float = 0.0
error_rate: float = 0.0
@dataclass
class Bottleneck:
"""Represents a performance bottleneck."""
endpoint: str
metric_type: str # response_time, error_rate, etc.
value: float
threshold: float
severity: str # high, medium, low
@dataclass
class Anomaly:
"""Represents a performance anomaly."""
timestamp: datetime
endpoint: str
expected_value: float
actual_value: float
deviation_percentage: float
@dataclass
class Recommendation:
"""Represents a performance improvement recommendation."""
issue: str
recommendation: str
expected_impact: str
implementation_difficulty: str # high, medium, low
@dataclass
class Insight:
"""Represents a performance insight."""
topic: str
description: str
supporting_data: Dict = field(default_factory=dict)
```
--------------------------------------------------------------------------------
/analyzer/parser/base.py:
--------------------------------------------------------------------------------
```python
"""
Base parser interface for JMeter test results.
This module defines the base interface for JTL parsers.
"""
import abc
from pathlib import Path
from typing import Union
from analyzer.models import TestResults
class JTLParser(abc.ABC):
"""Base class for JTL parsers."""
@abc.abstractmethod
def parse_file(self, file_path: Union[str, Path]) -> TestResults:
"""Parse a JTL file and return structured test results.
Args:
file_path: Path to the JTL file
Returns:
TestResults object containing parsed data
Raises:
FileNotFoundError: If the file does not exist
ValueError: If the file format is invalid
"""
pass
@staticmethod
def validate_file(file_path: Union[str, Path]) -> bool:
"""Validate that the file exists and has a valid extension.
Args:
file_path: Path to the JTL file
Returns:
True if the file is valid, False otherwise
"""
path = Path(file_path)
# Check if file exists
if not path.exists():
return False
# Check if file has a valid extension
valid_extensions = ['.jtl', '.xml', '.csv']
if path.suffix.lower() not in valid_extensions:
return False
return True
@staticmethod
def detect_format(file_path: Union[str, Path]) -> str:
"""Detect whether the JTL file is in XML or CSV format.
Args:
file_path: Path to the JTL file
Returns:
'xml' or 'csv'
Raises:
FileNotFoundError: If the file does not exist
ValueError: If the format cannot be determined
"""
path = Path(file_path)
# Check if file exists
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Try to determine format based on content
with open(path, 'r', encoding='utf-8') as f:
first_line = f.readline().strip()
# Check for XML declaration
if first_line.startswith('<?xml'):
return 'xml'
# Check for CSV header
if ',' in first_line and ('timeStamp' in first_line or 'elapsed' in first_line):
return 'csv'
# If we can't determine from the first line, check file extension
if path.suffix.lower() == '.xml':
return 'xml'
if path.suffix.lower() == '.csv':
return 'csv'
if path.suffix.lower() == '.jtl':
# For .jtl files, we need to look at more content
f.seek(0)
content = f.read(1000) # Read first 1000 chars
if '<?xml' in content:
return 'xml'
if ',' in content and ('timeStamp' in content or 'elapsed' in content):
return 'csv'
raise ValueError(f"Could not determine format of file: {file_path}")
```
--------------------------------------------------------------------------------
/tests/test_xml_parser.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the XML JTL parser.
"""
import os
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from analyzer.parser.xml_parser import XMLJTLParser
class TestXMLJTLParser(unittest.TestCase):
"""Tests for the XMLJTLParser class."""
def setUp(self):
"""Set up test fixtures."""
self.parser = XMLJTLParser()
# Create a sample XML JTL file
self.xml_content = """<?xml version="1.0" encoding="UTF-8"?>
<testResults version="1.2">
<httpSample t="1234" lt="1000" ts="1625097600000" s="true" lb="Home Page" rc="200" rm="" tn="Thread Group 1-1" by="1234" sby="1234" ct="800"/>
<httpSample t="2345" lt="2000" ts="1625097601000" s="true" lb="Login Page" rc="200" rm="" tn="Thread Group 1-1" by="2345" sby="2345" ct="900"/>
<httpSample t="3456" lt="3000" ts="1625097602000" s="false" lb="API Call" rc="500" rm="Internal Server Error" tn="Thread Group 1-2" by="3456" sby="345" ct="1000"/>
</testResults>
"""
self.xml_file = tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False)
self.xml_file.write(self.xml_content)
self.xml_file.close()
def tearDown(self):
"""Tear down test fixtures."""
os.unlink(self.xml_file.name)
def test_parse_file(self):
"""Test parsing an XML JTL file."""
test_results = self.parser.parse_file(self.xml_file.name)
# Check that we have the correct number of samples
self.assertEqual(len(test_results.samples), 3)
# Check the first sample
sample1 = test_results.samples[0]
self.assertEqual(sample1.label, "Home Page")
self.assertEqual(sample1.response_time, 1234)
self.assertTrue(sample1.success)
self.assertEqual(sample1.response_code, "200")
self.assertEqual(sample1.thread_name, "Thread Group 1-1")
self.assertEqual(sample1.bytes_received, 1234)
self.assertEqual(sample1.bytes_sent, 1234)
self.assertEqual(sample1.latency, 1000)
self.assertEqual(sample1.connect_time, 800)
# Check the third sample (error)
sample3 = test_results.samples[2]
self.assertEqual(sample3.label, "API Call")
self.assertEqual(sample3.response_time, 3456)
self.assertFalse(sample3.success)
self.assertEqual(sample3.response_code, "500")
self.assertEqual(sample3.error_message, "Internal Server Error")
# Check start and end times
expected_start = datetime.fromtimestamp(1625097600)
expected_end = datetime.fromtimestamp(1625097602)
self.assertEqual(test_results.start_time, expected_start)
self.assertEqual(test_results.end_time, expected_end)
def test_file_not_found(self):
"""Test parsing a non-existent file."""
with self.assertRaises(FileNotFoundError):
self.parser.parse_file('/path/to/nonexistent/file.xml')
def test_invalid_format(self):
"""Test parsing a file with invalid format."""
# Create a non-XML file
with tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False) as tmp:
tmp.write("This is not XML")
try:
with self.assertRaises(ValueError):
self.parser.parse_file(tmp.name)
finally:
os.unlink(tmp.name)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/analyzer/parser/xml_parser.py:
--------------------------------------------------------------------------------
```python
"""
XML parser for JMeter test results.
This module provides functionality for parsing JMeter test results
from JTL files in XML format using SAX for efficient processing.
"""
import xml.sax
from datetime import datetime
from pathlib import Path
from typing import Union
from analyzer.models import Sample, TestResults
from analyzer.parser.base import JTLParser
class JMeterXMLHandler(xml.sax.ContentHandler):
"""SAX handler for JMeter XML results."""
def __init__(self, test_results: TestResults):
"""Initialize the handler.
Args:
test_results: TestResults object to populate
"""
super().__init__()
self.test_results = test_results
def startElement(self, tag, attributes):
"""Process start element.
Args:
tag: Element tag name
attributes: Element attributes
"""
# Process httpSample or sample elements
if tag in ["httpSample", "sample"]:
try:
# Parse timestamp
ts = int(attributes.get("ts", "0")) / 1000 # Convert from ms to seconds
timestamp = datetime.fromtimestamp(ts)
# Create sample
sample = Sample(
timestamp=timestamp,
label=attributes.get("lb", ""),
response_time=int(attributes.get("t", "0")),
success=attributes.get("s", "true").lower() == "true",
response_code=attributes.get("rc", ""),
error_message=attributes.get("rm", ""),
thread_name=attributes.get("tn", ""),
bytes_received=int(attributes.get("by", "0")),
bytes_sent=int(attributes.get("sby", "0")),
latency=int(attributes.get("lt", "0")),
connect_time=int(attributes.get("ct", "0"))
)
# Add sample to test results
self.test_results.add_sample(sample)
except (ValueError, KeyError) as e:
# Log error but continue processing
print(f"Error parsing sample: {e}")
class XMLJTLParser(JTLParser):
"""Parser for JMeter JTL files in XML format."""
def parse_file(self, file_path: Union[str, Path]) -> TestResults:
"""Parse a JTL file in XML format.
Args:
file_path: Path to the JTL file
Returns:
TestResults object containing parsed data
Raises:
FileNotFoundError: If the file does not exist
ValueError: If the file format is invalid
"""
path = Path(file_path)
# Validate file
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Detect format
format_name = self.detect_format(path)
if format_name != "xml":
raise ValueError(f"Invalid file format. Expected XML, got {format_name}")
# Create test results object
test_results = TestResults()
# Create SAX parser
parser = xml.sax.make_parser()
parser.setFeature(xml.sax.handler.feature_namespaces, 0)
# Create and set content handler
handler = JMeterXMLHandler(test_results)
parser.setContentHandler(handler)
try:
# Parse the file
parser.parse(str(path))
except Exception as e:
raise ValueError(f"Error parsing XML file: {e}")
return test_results
```
--------------------------------------------------------------------------------
/tests/test_analyzer_parser.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the analyzer parser module.
"""
import os
import tempfile
import unittest
from pathlib import Path
from analyzer.parser.base import JTLParser
class TestJTLParserBase(unittest.TestCase):
"""Tests for the base JTLParser class."""
def test_validate_file_exists(self):
"""Test validating that a file exists."""
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix='.jtl') as tmp:
self.assertTrue(JTLParser.validate_file(tmp.name))
def test_validate_file_not_exists(self):
"""Test validating a non-existent file."""
self.assertFalse(JTLParser.validate_file('/path/to/nonexistent/file.jtl'))
def test_validate_file_extension(self):
"""Test validating file extensions."""
# Create temporary files with different extensions
with tempfile.NamedTemporaryFile(suffix='.jtl') as jtl_file, \
tempfile.NamedTemporaryFile(suffix='.xml') as xml_file, \
tempfile.NamedTemporaryFile(suffix='.csv') as csv_file, \
tempfile.NamedTemporaryFile(suffix='.txt') as txt_file:
self.assertTrue(JTLParser.validate_file(jtl_file.name))
self.assertTrue(JTLParser.validate_file(xml_file.name))
self.assertTrue(JTLParser.validate_file(csv_file.name))
self.assertFalse(JTLParser.validate_file(txt_file.name))
def test_detect_format_xml(self):
"""Test detecting XML format."""
# Create a temporary XML file
with tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False) as tmp:
tmp.write('<?xml version="1.0" encoding="UTF-8"?>\n<testResults>\n</testResults>')
try:
self.assertEqual(JTLParser.detect_format(tmp.name), 'xml')
finally:
os.unlink(tmp.name)
def test_detect_format_csv(self):
"""Test detecting CSV format."""
# Create a temporary CSV file
with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
tmp.write('timeStamp,elapsed,label,responseCode,success\n')
tmp.write('1625097600000,100,Test,200,true\n')
try:
self.assertEqual(JTLParser.detect_format(tmp.name), 'csv')
finally:
os.unlink(tmp.name)
def test_detect_format_jtl_xml(self):
"""Test detecting XML format in a .jtl file."""
# Create a temporary JTL file with XML content
with tempfile.NamedTemporaryFile(suffix='.jtl', mode='w', delete=False) as tmp:
tmp.write('<?xml version="1.0" encoding="UTF-8"?>\n<testResults>\n</testResults>')
try:
self.assertEqual(JTLParser.detect_format(tmp.name), 'xml')
finally:
os.unlink(tmp.name)
def test_detect_format_jtl_csv(self):
"""Test detecting CSV format in a .jtl file."""
# Create a temporary JTL file with CSV content
with tempfile.NamedTemporaryFile(suffix='.jtl', mode='w', delete=False) as tmp:
tmp.write('timeStamp,elapsed,label,responseCode,success\n')
tmp.write('1625097600000,100,Test,200,true\n')
try:
self.assertEqual(JTLParser.detect_format(tmp.name), 'csv')
finally:
os.unlink(tmp.name)
def test_detect_format_file_not_found(self):
"""Test detecting format of a non-existent file."""
with self.assertRaises(FileNotFoundError):
JTLParser.detect_format('/path/to/nonexistent/file.jtl')
def test_detect_format_unknown(self):
"""Test detecting format of a file with unknown format."""
# Create a temporary file with unknown content
with tempfile.NamedTemporaryFile(suffix='.txt', mode='w', delete=False) as tmp:
tmp.write('This is not a JTL file\n')
try:
with self.assertRaises(ValueError):
JTLParser.detect_format(tmp.name)
finally:
os.unlink(tmp.name)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_jmeter_server.py:
--------------------------------------------------------------------------------
```python
import sys
import types
import os
import tempfile
import unittest
from unittest import mock
# Stub external dependencies before importing jmeter_server
sys.modules['mcp'] = types.ModuleType('mcp')
sys.modules['mcp.server'] = types.ModuleType('mcp.server')
fastmcp_mod = types.ModuleType('mcp.server.fastmcp')
class FastMCP:
def __init__(self, *args, **kwargs):
pass
def tool(self, *args, **kwargs):
def decorator(func):
return func
return decorator
def run(self, *args, **kwargs):
pass
fastmcp_mod.FastMCP = FastMCP
sys.modules['mcp.server.fastmcp'] = fastmcp_mod
# Stub dotenv.load_dotenv
sys.modules['dotenv'] = types.ModuleType('dotenv')
sys.modules['dotenv'].load_dotenv = lambda: None
import jmeter_server
class TestRunJMeter(unittest.IsolatedAsyncioTestCase):
async def test_file_not_found(self):
result = await jmeter_server.run_jmeter("nonexistent.jmx")
self.assertEqual(
result,
"Error: Test file not found: nonexistent.jmx"
)
async def test_invalid_file_type(self):
with tempfile.NamedTemporaryFile(suffix=".txt") as tmp:
result = await jmeter_server.run_jmeter(tmp.name)
self.assertEqual(
result,
f"Error: Invalid file type. Expected .jmx file: {tmp.name}"
)
@mock.patch('jmeter_server.subprocess.run')
async def test_non_gui_success(self, mock_run):
# Prepare a dummy .jmx file
with tempfile.NamedTemporaryFile(suffix=".jmx", delete=False) as tmp:
test_file = tmp.name
# Fake successful subprocess result
class DummyResult:
returncode = 0
stdout = "Success output"
stderr = ""
mock_run.return_value = DummyResult()
result = await jmeter_server.run_jmeter(test_file, non_gui=True)
self.assertEqual(result, "Success output")
os.unlink(test_file)
@mock.patch('jmeter_server.subprocess.run')
async def test_non_gui_failure(self, mock_run):
# Prepare a dummy .jmx file
with tempfile.NamedTemporaryFile(suffix=".jmx", delete=False) as tmp:
test_file = tmp.name
# Fake failing subprocess result
class DummyResult:
returncode = 1
stdout = ""
stderr = "Error occurred"
mock_run.return_value = DummyResult()
result = await jmeter_server.run_jmeter(test_file, non_gui=True)
self.assertEqual(
result,
"Error executing JMeter test:\nError occurred"
)
os.unlink(test_file)
@mock.patch('jmeter_server.subprocess.Popen')
async def test_gui_mode(self, mock_popen):
# Prepare a dummy .jmx file
with tempfile.NamedTemporaryFile(suffix=".jmx", delete=False) as tmp:
test_file = tmp.name
result = await jmeter_server.run_jmeter(test_file, non_gui=False)
self.assertEqual(result, "JMeter GUI launched successfully")
mock_popen.assert_called()
os.unlink(test_file)
@mock.patch('jmeter_server.run_jmeter', new_callable=mock.AsyncMock)
async def test_execute_jmeter_test_default(self, mock_run_jmeter):
mock_run_jmeter.return_value = "wrapped output"
result = await jmeter_server.execute_jmeter_test("file.jmx")
mock_run_jmeter.assert_awaited_with("file.jmx", non_gui=True)
self.assertEqual(result, "wrapped output")
@mock.patch('jmeter_server.run_jmeter', new_callable=mock.AsyncMock)
async def test_execute_jmeter_test_gui(self, mock_run_jmeter):
mock_run_jmeter.return_value = "gui output"
result = await jmeter_server.execute_jmeter_test("file.jmx", gui_mode=True)
mock_run_jmeter.assert_awaited_with("file.jmx", non_gui=False)
self.assertEqual(result, "gui output")
@mock.patch('jmeter_server.run_jmeter', new_callable=mock.AsyncMock)
async def test_execute_jmeter_test_non_gui(self, mock_run_jmeter):
mock_run_jmeter.return_value = "non-gui output"
result = await jmeter_server.execute_jmeter_test_non_gui("file.jmx")
mock_run_jmeter.assert_awaited_with("file.jmx", non_gui=True)
self.assertEqual(result, "non-gui output")
class TestUnexpectedError(unittest.IsolatedAsyncioTestCase):
@mock.patch('jmeter_server.Path.resolve', side_effect=Exception("resolve error"))
async def test_unexpected_error(self, mock_resolve):
result = await jmeter_server.run_jmeter("any.jmx")
self.assertTrue(result.startswith("Unexpected error: resolve error"))
```
--------------------------------------------------------------------------------
/analyzer/parser/csv_parser.py:
--------------------------------------------------------------------------------
```python
"""
CSV parser for JMeter test results.
This module provides functionality for parsing JMeter test results
from JTL files in CSV format using streaming for efficient processing.
"""
import csv
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Union
from analyzer.models import Sample, TestResults
from analyzer.parser.base import JTLParser
class CSVJTLParser(JTLParser):
"""Parser for JMeter JTL files in CSV format."""
# Default column mappings for JMeter CSV output
DEFAULT_COLUMN_MAPPINGS = {
'timestamp': 'timeStamp',
'label': 'label',
'response_time': 'elapsed',
'success': 'success',
'response_code': 'responseCode',
'error_message': 'responseMessage',
'thread_name': 'threadName',
'bytes_received': 'bytes',
'bytes_sent': 'sentBytes',
'latency': 'Latency',
'connect_time': 'Connect'
}
def __init__(self, column_mappings: Dict[str, str] = None):
"""Initialize the parser.
Args:
column_mappings: Custom column mappings (default: None)
"""
self.column_mappings = column_mappings or self.DEFAULT_COLUMN_MAPPINGS
def parse_file(self, file_path: Union[str, Path]) -> TestResults:
"""Parse a JTL file in CSV format.
Args:
file_path: Path to the JTL file
Returns:
TestResults object containing parsed data
Raises:
FileNotFoundError: If the file does not exist
ValueError: If the file format is invalid
"""
path = Path(file_path)
# Validate file
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Detect format
format_name = self.detect_format(path)
if format_name != "csv":
raise ValueError(f"Invalid file format. Expected CSV, got {format_name}")
# Create test results object
test_results = TestResults()
try:
# Open and parse the CSV file
with open(path, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
# Validate that required columns are present
if not reader.fieldnames:
raise ValueError("CSV file has no header row")
# Check if we can map all required columns
missing_columns = []
column_indices = {}
for model_field, csv_field in self.column_mappings.items():
if csv_field not in reader.fieldnames:
missing_columns.append(csv_field)
else:
column_indices[model_field] = reader.fieldnames.index(csv_field)
if missing_columns:
raise ValueError(f"CSV file is missing required columns: {', '.join(missing_columns)}")
# Process each row
for row in reader:
try:
# Parse timestamp (convert from milliseconds to seconds)
ts = int(row[self.column_mappings['timestamp']]) / 1000
timestamp = datetime.fromtimestamp(ts)
# Parse success (convert string to boolean)
success_str = row[self.column_mappings['success']].lower()
success = success_str == "true" or success_str == "1"
# Create sample
sample = Sample(
timestamp=timestamp,
label=row[self.column_mappings['label']],
response_time=int(row[self.column_mappings['response_time']]),
success=success,
response_code=row[self.column_mappings['response_code']],
error_message=row.get(self.column_mappings['error_message'], ""),
thread_name=row.get(self.column_mappings['thread_name'], ""),
bytes_received=int(row.get(self.column_mappings['bytes_received'], 0)),
bytes_sent=int(row.get(self.column_mappings['bytes_sent'], 0)),
latency=int(row.get(self.column_mappings['latency'], 0)),
connect_time=int(row.get(self.column_mappings['connect_time'], 0))
)
# Add sample to test results
test_results.add_sample(sample)
except (ValueError, KeyError) as e:
# Log error but continue processing
print(f"Error parsing row: {e}")
continue
except Exception as e:
raise ValueError(f"Error parsing CSV file: {e}")
return test_results
```
--------------------------------------------------------------------------------
/tests/test_csv_parser.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the CSV JTL parser.
"""
import os
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from analyzer.parser.csv_parser import CSVJTLParser
class TestCSVJTLParser(unittest.TestCase):
"""Tests for the CSVJTLParser class."""
def setUp(self):
"""Set up test fixtures."""
self.parser = CSVJTLParser()
# Create a sample CSV JTL file
self.csv_content = """timeStamp,elapsed,label,responseCode,success,threadName,bytes,sentBytes,Latency,Connect,responseMessage
1625097600000,1234,Home Page,200,true,Thread Group 1-1,12345,1234,1000,800,
1625097601000,2345,Login Page,200,true,Thread Group 1-1,23456,2345,2000,900,
1625097602000,3456,API Call,500,false,Thread Group 1-2,3456,345,3000,1000,Internal Server Error
"""
self.csv_file = tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False)
self.csv_file.write(self.csv_content)
self.csv_file.close()
def tearDown(self):
"""Tear down test fixtures."""
os.unlink(self.csv_file.name)
def test_parse_file(self):
"""Test parsing a CSV JTL file."""
test_results = self.parser.parse_file(self.csv_file.name)
# Check that we have the correct number of samples
self.assertEqual(len(test_results.samples), 3)
# Check the first sample
sample1 = test_results.samples[0]
self.assertEqual(sample1.label, "Home Page")
self.assertEqual(sample1.response_time, 1234)
self.assertTrue(sample1.success)
self.assertEqual(sample1.response_code, "200")
self.assertEqual(sample1.thread_name, "Thread Group 1-1")
self.assertEqual(sample1.bytes_received, 12345)
self.assertEqual(sample1.bytes_sent, 1234)
self.assertEqual(sample1.latency, 1000)
self.assertEqual(sample1.connect_time, 800)
# Check the third sample (error)
sample3 = test_results.samples[2]
self.assertEqual(sample3.label, "API Call")
self.assertEqual(sample3.response_time, 3456)
self.assertFalse(sample3.success)
self.assertEqual(sample3.response_code, "500")
self.assertEqual(sample3.error_message, "Internal Server Error")
# Check start and end times
expected_start = datetime.fromtimestamp(1625097600)
expected_end = datetime.fromtimestamp(1625097602)
self.assertEqual(test_results.start_time, expected_start)
self.assertEqual(test_results.end_time, expected_end)
def test_file_not_found(self):
"""Test parsing a non-existent file."""
with self.assertRaises(FileNotFoundError):
self.parser.parse_file('/path/to/nonexistent/file.csv')
def test_invalid_format(self):
"""Test parsing a file with invalid format."""
# Create a non-CSV file
with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
tmp.write("This is not CSV")
try:
with self.assertRaises(ValueError):
self.parser.parse_file(tmp.name)
finally:
os.unlink(tmp.name)
def test_missing_columns(self):
"""Test parsing a CSV file with missing required columns."""
# Create a CSV file with missing columns
with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
tmp.write("timestamp,label,responseCode\n")
tmp.write("1625097600000,Home Page,200\n")
try:
with self.assertRaises(ValueError):
self.parser.parse_file(tmp.name)
finally:
os.unlink(tmp.name)
def test_custom_column_mappings(self):
"""Test parsing a CSV file with custom column mappings."""
# Create a CSV file with different column names but standard format
# to pass the format detection
custom_csv_content = """timeStamp,elapsed,label,responseCode,success,threadName,bytes,sentBytes,Latency,Connect,responseMessage
1625097600000,1234,Home Page,200,true,Thread Group 1-1,12345,1234,1000,800,
"""
with tempfile.NamedTemporaryFile(suffix='.csv', mode='w', delete=False) as tmp:
tmp.write(custom_csv_content)
try:
# Create parser with custom column mappings
custom_mappings = {
'timestamp': 'timeStamp',
'label': 'label',
'response_time': 'elapsed',
'success': 'success',
'response_code': 'responseCode',
'error_message': 'responseMessage',
'thread_name': 'threadName',
'bytes_received': 'bytes',
'bytes_sent': 'sentBytes',
'latency': 'Latency',
'connect_time': 'Connect'
}
custom_parser = CSVJTLParser(column_mappings=custom_mappings)
# This should work with our custom mappings
test_results = custom_parser.parse_file(tmp.name)
self.assertEqual(len(test_results.samples), 1)
self.assertEqual(test_results.samples[0].label, "Home Page")
finally:
os.unlink(tmp.name)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_analyzer_models.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the analyzer models.
"""
import unittest
from datetime import datetime
from analyzer.models import Sample, TestResults, OverallMetrics, EndpointMetrics
class TestSample(unittest.TestCase):
"""Tests for the Sample class."""
def test_sample_creation(self):
"""Test creating a Sample instance."""
timestamp = datetime.now()
sample = Sample(
timestamp=timestamp,
label="Test Sample",
response_time=100,
success=True,
response_code="200",
error_message=None,
thread_name="Thread Group 1-1",
bytes_sent=150,
bytes_received=1024,
latency=50,
connect_time=20
)
self.assertEqual(sample.timestamp, timestamp)
self.assertEqual(sample.label, "Test Sample")
self.assertEqual(sample.response_time, 100)
self.assertTrue(sample.success)
self.assertEqual(sample.response_code, "200")
self.assertIsNone(sample.error_message)
self.assertEqual(sample.thread_name, "Thread Group 1-1")
self.assertEqual(sample.bytes_sent, 150)
self.assertEqual(sample.bytes_received, 1024)
self.assertEqual(sample.latency, 50)
self.assertEqual(sample.connect_time, 20)
class TestTestResults(unittest.TestCase):
"""Tests for the TestResults class."""
def test_add_sample(self):
"""Test adding samples to TestResults."""
results = TestResults()
self.assertEqual(len(results.samples), 0)
# Add a sample
timestamp1 = datetime(2023, 1, 1, 12, 0, 0)
sample1 = Sample(
timestamp=timestamp1,
label="Sample 1",
response_time=100,
success=True,
response_code="200"
)
results.add_sample(sample1)
self.assertEqual(len(results.samples), 1)
self.assertEqual(results.start_time, timestamp1)
self.assertEqual(results.end_time, timestamp1)
# Add another sample with earlier timestamp
timestamp2 = datetime(2023, 1, 1, 11, 0, 0)
sample2 = Sample(
timestamp=timestamp2,
label="Sample 2",
response_time=200,
success=True,
response_code="200"
)
results.add_sample(sample2)
self.assertEqual(len(results.samples), 2)
self.assertEqual(results.start_time, timestamp2) # Should update to earlier time
self.assertEqual(results.end_time, timestamp1)
# Add another sample with later timestamp
timestamp3 = datetime(2023, 1, 1, 13, 0, 0)
sample3 = Sample(
timestamp=timestamp3,
label="Sample 3",
response_time=300,
success=True,
response_code="200"
)
results.add_sample(sample3)
self.assertEqual(len(results.samples), 3)
self.assertEqual(results.start_time, timestamp2)
self.assertEqual(results.end_time, timestamp3) # Should update to later time
class TestMetrics(unittest.TestCase):
"""Tests for the metrics classes."""
def test_overall_metrics(self):
"""Test creating OverallMetrics instance."""
metrics = OverallMetrics(
total_samples=100,
error_count=5,
error_rate=5.0,
average_response_time=250.5,
median_response_time=220.0,
percentile_90=400.0,
percentile_95=450.0,
percentile_99=500.0,
min_response_time=100.0,
max_response_time=600.0,
throughput=10.5,
test_duration=60.0
)
self.assertEqual(metrics.total_samples, 100)
self.assertEqual(metrics.error_count, 5)
self.assertEqual(metrics.error_rate, 5.0)
self.assertEqual(metrics.average_response_time, 250.5)
self.assertEqual(metrics.median_response_time, 220.0)
self.assertEqual(metrics.percentile_90, 400.0)
self.assertEqual(metrics.percentile_95, 450.0)
self.assertEqual(metrics.percentile_99, 500.0)
self.assertEqual(metrics.min_response_time, 100.0)
self.assertEqual(metrics.max_response_time, 600.0)
self.assertEqual(metrics.throughput, 10.5)
self.assertEqual(metrics.test_duration, 60.0)
def test_endpoint_metrics(self):
"""Test creating EndpointMetrics instance."""
metrics = EndpointMetrics(
endpoint="Test Endpoint",
total_samples=50,
error_count=2,
error_rate=4.0,
average_response_time=200.5,
median_response_time=180.0,
percentile_90=350.0,
percentile_95=400.0,
percentile_99=450.0,
min_response_time=90.0,
max_response_time=500.0,
throughput=8.5,
test_duration=60.0
)
self.assertEqual(metrics.endpoint, "Test Endpoint")
self.assertEqual(metrics.total_samples, 50)
self.assertEqual(metrics.error_count, 2)
self.assertEqual(metrics.error_rate, 4.0)
self.assertEqual(metrics.average_response_time, 200.5)
self.assertEqual(metrics.median_response_time, 180.0)
self.assertEqual(metrics.percentile_90, 350.0)
self.assertEqual(metrics.percentile_95, 400.0)
self.assertEqual(metrics.percentile_99, 450.0)
self.assertEqual(metrics.min_response_time, 90.0)
self.assertEqual(metrics.max_response_time, 500.0)
self.assertEqual(metrics.throughput, 8.5)
self.assertEqual(metrics.test_duration, 60.0)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_insights_generator.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the insights generator.
"""
import unittest
from analyzer.insights.generator import InsightsGenerator
from analyzer.models import Bottleneck, Recommendation
class TestInsightsGenerator(unittest.TestCase):
"""Tests for the InsightsGenerator class."""
def setUp(self):
"""Set up test fixtures."""
self.generator = InsightsGenerator()
# Create bottlenecks
self.bottlenecks = [
Bottleneck(
endpoint="Slow Endpoint",
metric_type="response_time",
value=500.0,
threshold=200.0,
severity="high"
),
Bottleneck(
endpoint="Medium Endpoint",
metric_type="response_time",
value=300.0,
threshold=200.0,
severity="medium"
),
Bottleneck(
endpoint="Error Endpoint",
metric_type="error_rate",
value=15.0,
threshold=5.0,
severity="high"
)
]
# Create error analysis
self.error_analysis = {
"error_types": {
"Connection timeout": 10,
"500 Internal Server Error": 5,
"404 Not Found": 3
},
"error_patterns": [
{
"type": "spike",
"timestamp": "2023-01-01T12:00:00",
"error_count": 8
}
]
}
# Create concurrency analysis
self.concurrency_analysis = {
"correlation": 0.85,
"degradation_threshold": 50,
"has_degradation": True
}
def test_generate_bottleneck_recommendations(self):
"""Test generating recommendations for bottlenecks."""
recommendations = self.generator.generate_bottleneck_recommendations(self.bottlenecks)
# We should have at least 2 recommendations (one for response time, one for error rate)
self.assertGreaterEqual(len(recommendations), 2)
# Check that we have recommendations for both types of bottlenecks
recommendation_issues = [r.issue for r in recommendations]
self.assertTrue(any("response time" in issue.lower() for issue in recommendation_issues))
self.assertTrue(any("error rate" in issue.lower() for issue in recommendation_issues))
# Check that recommendations have all required fields
for recommendation in recommendations:
self.assertIsNotNone(recommendation.issue)
self.assertIsNotNone(recommendation.recommendation)
self.assertIsNotNone(recommendation.expected_impact)
self.assertIsNotNone(recommendation.implementation_difficulty)
def test_generate_error_recommendations(self):
"""Test generating recommendations for error patterns."""
recommendations = self.generator.generate_error_recommendations(self.error_analysis)
# We should have at least 3 recommendations (one for each error type)
self.assertGreaterEqual(len(recommendations), 3)
# Check that we have recommendations for the error types
recommendation_issues = [r.issue for r in recommendations]
self.assertTrue(any("timeout" in issue.lower() for issue in recommendation_issues))
self.assertTrue(any("server" in issue.lower() for issue in recommendation_issues))
# Check that recommendations have all required fields
for recommendation in recommendations:
self.assertIsNotNone(recommendation.issue)
self.assertIsNotNone(recommendation.recommendation)
self.assertIsNotNone(recommendation.expected_impact)
self.assertIsNotNone(recommendation.implementation_difficulty)
def test_generate_scaling_insights(self):
"""Test generating insights on scaling behavior."""
insights = self.generator.generate_scaling_insights(self.concurrency_analysis)
# We should have at least 2 insights
self.assertGreaterEqual(len(insights), 2)
# Check that we have insights about correlation and degradation
insight_topics = [i.topic for i in insights]
self.assertTrue(any("correlation" in topic.lower() for topic in insight_topics))
self.assertTrue(any("degradation" in topic.lower() for topic in insight_topics))
# Check that insights have all required fields
for insight in insights:
self.assertIsNotNone(insight.topic)
self.assertIsNotNone(insight.description)
self.assertIsNotNone(insight.supporting_data)
def test_prioritize_recommendations(self):
"""Test prioritizing recommendations."""
# Create some recommendations
recommendations = [
Recommendation(
issue="Critical response time issues",
recommendation="Optimize database queries",
expected_impact="Significant reduction in response times",
implementation_difficulty="medium"
),
Recommendation(
issue="Moderate error rates",
recommendation="Add error handling",
expected_impact="Reduction in error rates",
implementation_difficulty="low"
),
Recommendation(
issue="Minor UI issues",
recommendation="Fix UI bugs",
expected_impact="Improved user experience",
implementation_difficulty="high"
)
]
prioritized = self.generator.prioritize_recommendations(recommendations)
# We should have 3 prioritized recommendations
self.assertEqual(len(prioritized), 3)
# Check that they are sorted by priority score (descending)
self.assertGreaterEqual(prioritized[0]["priority_score"], prioritized[1]["priority_score"])
self.assertGreaterEqual(prioritized[1]["priority_score"], prioritized[2]["priority_score"])
# Check that each prioritized recommendation has the required fields
for item in prioritized:
self.assertIn("recommendation", item)
self.assertIn("priority_score", item)
self.assertIn("priority_level", item)
def test_empty_inputs(self):
"""Test handling of empty inputs."""
self.assertEqual(len(self.generator.generate_bottleneck_recommendations([])), 0)
self.assertEqual(len(self.generator.generate_error_recommendations({})), 0)
self.assertGreaterEqual(len(self.generator.generate_scaling_insights({})), 1) # Should still generate at least one insight
self.assertEqual(len(self.generator.prioritize_recommendations([])), 0)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_bottleneck_analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the bottleneck analyzer.
"""
import unittest
from datetime import datetime, timedelta
from analyzer.bottleneck.analyzer import BottleneckAnalyzer
from analyzer.models import EndpointMetrics, TimeSeriesMetrics
class TestBottleneckAnalyzer(unittest.TestCase):
"""Tests for the BottleneckAnalyzer class."""
def setUp(self):
"""Set up test fixtures."""
self.analyzer = BottleneckAnalyzer()
# Create endpoint metrics
self.endpoint_metrics = {
"Fast Endpoint": EndpointMetrics(
endpoint="Fast Endpoint",
total_samples=100,
error_count=0,
error_rate=0.0,
average_response_time=100.0,
median_response_time=95.0,
percentile_90=150.0,
percentile_95=180.0,
percentile_99=200.0,
min_response_time=50.0,
max_response_time=250.0,
throughput=10.0,
test_duration=10.0
),
"Medium Endpoint": EndpointMetrics(
endpoint="Medium Endpoint",
total_samples=100,
error_count=2,
error_rate=2.0,
average_response_time=200.0,
median_response_time=190.0,
percentile_90=300.0,
percentile_95=350.0,
percentile_99=400.0,
min_response_time=100.0,
max_response_time=450.0,
throughput=10.0,
test_duration=10.0
),
"Slow Endpoint": EndpointMetrics(
endpoint="Slow Endpoint",
total_samples=100,
error_count=5,
error_rate=5.0,
average_response_time=500.0,
median_response_time=450.0,
percentile_90=800.0,
percentile_95=900.0,
percentile_99=1000.0,
min_response_time=200.0,
max_response_time=1200.0,
throughput=10.0,
test_duration=10.0
),
"Error Endpoint": EndpointMetrics(
endpoint="Error Endpoint",
total_samples=100,
error_count=15,
error_rate=15.0,
average_response_time=300.0,
median_response_time=280.0,
percentile_90=450.0,
percentile_95=500.0,
percentile_99=600.0,
min_response_time=150.0,
max_response_time=700.0,
throughput=10.0,
test_duration=10.0
)
}
# Create time series metrics
base_time = datetime(2023, 1, 1, 12, 0, 0)
self.time_series_metrics = [
TimeSeriesMetrics(
timestamp=base_time + timedelta(seconds=i * 5),
active_threads=i + 1,
throughput=10.0,
average_response_time=100.0 + i * 20,
error_rate=0.0 if i < 8 else 5.0
)
for i in range(10)
]
# Add an anomaly
self.time_series_metrics[5].average_response_time = 500.0 # Spike in the middle
def test_identify_slow_endpoints(self):
"""Test identifying slow endpoints."""
# Use a higher threshold factor to get only the Slow Endpoint
bottlenecks = self.analyzer.identify_slow_endpoints(self.endpoint_metrics, threshold_factor=2.0)
# We should have identified the slow endpoint
self.assertEqual(len(bottlenecks), 1)
self.assertEqual(bottlenecks[0].endpoint, "Slow Endpoint")
self.assertEqual(bottlenecks[0].metric_type, "response_time")
# With threshold_factor=2.0, the severity should be medium or high
self.assertIn(bottlenecks[0].severity, ["medium", "high"])
# Test with a lower threshold factor to catch more endpoints
bottlenecks = self.analyzer.identify_slow_endpoints(self.endpoint_metrics, threshold_factor=0.8)
self.assertGreaterEqual(len(bottlenecks), 2)
self.assertEqual(bottlenecks[0].endpoint, "Slow Endpoint") # Should still be first
def test_identify_error_prone_endpoints(self):
"""Test identifying error-prone endpoints."""
bottlenecks = self.analyzer.identify_error_prone_endpoints(self.endpoint_metrics, threshold_error_rate=3.0)
# We should have identified both error-prone endpoints
self.assertEqual(len(bottlenecks), 2)
self.assertEqual(bottlenecks[0].endpoint, "Error Endpoint") # Higher error rate should be first
self.assertEqual(bottlenecks[0].metric_type, "error_rate")
self.assertEqual(bottlenecks[0].severity, "high")
self.assertEqual(bottlenecks[1].endpoint, "Slow Endpoint")
self.assertEqual(bottlenecks[1].metric_type, "error_rate")
self.assertEqual(bottlenecks[1].severity, "medium")
# Test with a higher threshold to catch fewer endpoints
bottlenecks = self.analyzer.identify_error_prone_endpoints(self.endpoint_metrics, threshold_error_rate=10.0)
self.assertEqual(len(bottlenecks), 1)
self.assertEqual(bottlenecks[0].endpoint, "Error Endpoint")
def test_detect_anomalies(self):
"""Test detecting response time anomalies."""
anomalies = self.analyzer.detect_anomalies(self.time_series_metrics)
# We should have detected the spike
self.assertGreaterEqual(len(anomalies), 1)
# The spike should be the first anomaly
spike_anomaly = anomalies[0]
self.assertEqual(spike_anomaly.timestamp, datetime(2023, 1, 1, 12, 0, 25)) # 5th interval
self.assertGreater(abs(spike_anomaly.deviation_percentage), 50) # Should be a significant deviation
def test_analyze_concurrency_impact(self):
"""Test analyzing concurrency impact."""
# Our time series has increasing thread counts and response times
analysis = self.analyzer.analyze_concurrency_impact(self.time_series_metrics)
# There should be a positive correlation
self.assertGreater(analysis["correlation"], 0.5)
# Create a new time series with no correlation
no_correlation_series = [
TimeSeriesMetrics(
timestamp=datetime(2023, 1, 1, 12, 0, 0) + timedelta(seconds=i * 5),
active_threads=i + 1,
throughput=10.0,
average_response_time=200.0, # Constant response time
error_rate=0.0
)
for i in range(10)
]
analysis = self.analyzer.analyze_concurrency_impact(no_correlation_series)
self.assertLess(analysis["correlation"], 0.5)
self.assertFalse(analysis["has_degradation"])
def test_empty_inputs(self):
"""Test handling of empty inputs."""
self.assertEqual(len(self.analyzer.identify_slow_endpoints({})), 0)
self.assertEqual(len(self.analyzer.identify_error_prone_endpoints({})), 0)
self.assertEqual(len(self.analyzer.detect_anomalies([])), 0)
analysis = self.analyzer.analyze_concurrency_impact([])
self.assertEqual(analysis["correlation"], 0)
self.assertFalse(analysis["has_degradation"])
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/windsurf_db_reader.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Windsurf LevelDB Reader
A tool to read and explore Windsurf's local storage database.
"""
import os
import json
import sys
from pathlib import Path
try:
import plyvel
except ImportError:
print("plyvel not installed. Install with: pip install plyvel")
sys.exit(1)
class WindsurfDBReader:
def __init__(self, db_path=None):
if db_path is None:
# Default path for Windsurf Next on macOS
home = Path.home()
db_path = home / "Library/Application Support/Windsurf - Next/Local Storage/leveldb"
self.db_path = Path(db_path)
self.db = None
def connect(self):
"""Connect to the LevelDB database"""
try:
self.db = plyvel.DB(str(self.db_path), create_if_missing=False)
print(f"✅ Connected to database: {self.db_path}")
return True
except Exception as e:
print(f"❌ Failed to connect to database: {e}")
return False
def close(self):
"""Close the database connection"""
if self.db:
self.db.close()
print("🔒 Database connection closed")
def list_all_keys(self, limit=50):
"""List all keys in the database"""
if not self.db:
print("❌ Database not connected")
return
print(f"\n📋 Listing up to {limit} keys:")
count = 0
for key, value in self.db:
try:
key_str = key.decode('utf-8', errors='ignore')
value_preview = str(value[:100]) if len(value) > 100 else str(value)
print(f"{count + 1:3d}. Key: {key_str}")
print(f" Value preview: {value_preview}")
print(f" Value size: {len(value)} bytes")
print("-" * 50)
count += 1
if count >= limit:
break
except Exception as e:
print(f"Error reading key {count + 1}: {e}")
count += 1
def search_keys(self, pattern):
"""Search for keys containing a specific pattern"""
if not self.db:
print("❌ Database not connected")
return
print(f"\n🔍 Searching for keys containing '{pattern}':")
found = 0
for key, value in self.db:
try:
key_str = key.decode('utf-8', errors='ignore')
if pattern.lower() in key_str.lower():
print(f"Found: {key_str}")
print(f"Value size: {len(value)} bytes")
# Try to decode value if it looks like JSON
try:
if value.startswith(b'{') or value.startswith(b'['):
json_data = json.loads(value.decode('utf-8'))
print(f"JSON preview: {json.dumps(json_data, indent=2)[:200]}...")
except:
pass
print("-" * 50)
found += 1
except Exception as e:
print(f"Error searching key: {e}")
if found == 0:
print(f"No keys found containing '{pattern}'")
def get_value(self, key):
"""Get a specific value by key"""
if not self.db:
print("❌ Database not connected")
return None
try:
key_bytes = key.encode('utf-8') if isinstance(key, str) else key
value = self.db.get(key_bytes)
if value is None:
print(f"❌ Key '{key}' not found")
return None
print(f"✅ Found value for key '{key}':")
print(f"Size: {len(value)} bytes")
# Try to decode as JSON
try:
if value.startswith(b'{') or value.startswith(b'['):
json_data = json.loads(value.decode('utf-8'))
print("JSON content:")
print(json.dumps(json_data, indent=2))
return json_data
except:
pass
# Try to decode as text
try:
text = value.decode('utf-8')
print("Text content:")
print(text)
return text
except:
print("Binary content (showing first 200 bytes):")
print(value[:200])
return value
except Exception as e:
print(f"❌ Error getting value: {e}")
return None
def export_to_json(self, output_file="windsurf_db_export.json", max_entries=1000):
"""Export database contents to JSON file"""
if not self.db:
print("❌ Database not connected")
return
export_data = {}
count = 0
print(f"📤 Exporting database to {output_file}...")
for key, value in self.db:
if count >= max_entries:
break
try:
key_str = key.decode('utf-8', errors='ignore')
# Try to decode value as JSON first
try:
if value.startswith(b'{') or value.startswith(b'['):
value_data = json.loads(value.decode('utf-8'))
else:
value_data = value.decode('utf-8', errors='ignore')
except:
value_data = f"<binary data: {len(value)} bytes>"
export_data[key_str] = value_data
count += 1
except Exception as e:
print(f"Error exporting entry {count}: {e}")
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"✅ Exported {count} entries to {output_file}")
except Exception as e:
print(f"❌ Error writing export file: {e}")
def main():
"""Main function with interactive menu"""
reader = WindsurfDBReader()
if not reader.connect():
return
try:
while True:
print("\n" + "="*60)
print("🌊 Windsurf Database Reader")
print("="*60)
print("1. List all keys (first 50)")
print("2. Search keys by pattern")
print("3. Get value by key")
print("4. Export to JSON")
print("5. Search for 'memory' related keys")
print("6. Search for 'conversation' related keys")
print("0. Exit")
print("-"*60)
choice = input("Enter your choice (0-6): ").strip()
if choice == '0':
break
elif choice == '1':
reader.list_all_keys()
elif choice == '2':
pattern = input("Enter search pattern: ").strip()
if pattern:
reader.search_keys(pattern)
elif choice == '3':
key = input("Enter key: ").strip()
if key:
reader.get_value(key)
elif choice == '4':
filename = input("Enter output filename (default: windsurf_db_export.json): ").strip()
if not filename:
filename = "windsurf_db_export.json"
reader.export_to_json(filename)
elif choice == '5':
reader.search_keys('memory')
elif choice == '6':
reader.search_keys('conversation')
else:
print("❌ Invalid choice. Please try again.")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
finally:
reader.close()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/test_metrics_calculator.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the metrics calculator.
"""
import unittest
from datetime import datetime, timedelta
from analyzer.metrics.calculator import MetricsCalculator
from analyzer.models import Sample, TestResults
class TestMetricsCalculator(unittest.TestCase):
"""Tests for the MetricsCalculator class."""
def setUp(self):
"""Set up test fixtures."""
self.calculator = MetricsCalculator()
# Create test results with samples
self.test_results = TestResults()
# Add samples for endpoint 1
base_time = datetime(2023, 1, 1, 12, 0, 0)
for i in range(10):
sample = Sample(
timestamp=base_time + timedelta(seconds=i),
label="Endpoint1",
response_time=100 + i * 10, # 100, 110, 120, ..., 190
success=True,
response_code="200"
)
self.test_results.add_sample(sample)
# Add samples for endpoint 2 (including some errors)
for i in range(5):
sample = Sample(
timestamp=base_time + timedelta(seconds=i + 10),
label="Endpoint2",
response_time=200 + i * 20, # 200, 220, 240, 260, 280
success=i < 4, # Last one is an error
response_code="200" if i < 4 else "500",
error_message="" if i < 4 else "Internal Server Error"
)
self.test_results.add_sample(sample)
def test_calculate_overall_metrics(self):
"""Test calculating overall metrics."""
metrics = self.calculator.calculate_overall_metrics(self.test_results)
# Check basic metrics
self.assertEqual(metrics.total_samples, 15)
self.assertEqual(metrics.error_count, 1)
self.assertAlmostEqual(metrics.error_rate, 100 * 1/15)
# Check response time metrics
expected_response_times = [100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 220, 240, 260, 280]
self.assertAlmostEqual(metrics.average_response_time, sum(expected_response_times) / len(expected_response_times))
self.assertAlmostEqual(metrics.median_response_time, 170) # Median of the 15 values
self.assertAlmostEqual(metrics.min_response_time, 100)
self.assertAlmostEqual(metrics.max_response_time, 280)
# Check percentiles
self.assertAlmostEqual(metrics.percentile_90, 260)
self.assertAlmostEqual(metrics.percentile_95, 270)
self.assertAlmostEqual(metrics.percentile_99, 278)
# Check throughput and duration
self.assertEqual(metrics.test_duration, 14) # 14 seconds from first to last sample
self.assertAlmostEqual(metrics.throughput, 15 / 14) # 15 samples over 14 seconds
def test_calculate_endpoint_metrics(self):
"""Test calculating endpoint-specific metrics."""
endpoint_metrics = self.calculator.calculate_endpoint_metrics(self.test_results)
# Check that we have metrics for both endpoints
self.assertEqual(len(endpoint_metrics), 2)
self.assertIn("Endpoint1", endpoint_metrics)
self.assertIn("Endpoint2", endpoint_metrics)
# Check metrics for endpoint 1
metrics1 = endpoint_metrics["Endpoint1"]
self.assertEqual(metrics1.endpoint, "Endpoint1")
self.assertEqual(metrics1.total_samples, 10)
self.assertEqual(metrics1.error_count, 0)
self.assertEqual(metrics1.error_rate, 0)
self.assertAlmostEqual(metrics1.average_response_time, 145) # Average of 100, 110, ..., 190
# Check metrics for endpoint 2
metrics2 = endpoint_metrics["Endpoint2"]
self.assertEqual(metrics2.endpoint, "Endpoint2")
self.assertEqual(metrics2.total_samples, 5)
self.assertEqual(metrics2.error_count, 1)
self.assertAlmostEqual(metrics2.error_rate, 20) # 1 error out of 5 samples
self.assertAlmostEqual(metrics2.average_response_time, 240) # Average of 200, 220, 240, 260, 280
def test_calculate_time_series_metrics(self):
"""Test calculating time series metrics."""
# Use a 5-second interval
time_series = self.calculator.calculate_time_series_metrics(self.test_results, interval_seconds=5)
# We should have 3 intervals: 0-5s, 5-10s, 10-15s
self.assertEqual(len(time_series), 3)
# Check first interval (0-5s)
self.assertEqual(time_series[0].timestamp, datetime(2023, 1, 1, 12, 0, 0))
self.assertEqual(time_series[0].active_threads, 0) # No thread names in our test data
self.assertAlmostEqual(time_series[0].throughput, 5 / 5) # 5 samples over 5 seconds
self.assertAlmostEqual(time_series[0].average_response_time, (100 + 110 + 120 + 130 + 140) / 5)
self.assertEqual(time_series[0].error_rate, 0) # No errors in first interval
# Check third interval (10-15s)
self.assertEqual(time_series[2].timestamp, datetime(2023, 1, 1, 12, 0, 10))
self.assertAlmostEqual(time_series[2].throughput, 5 / 5) # 5 samples over 5 seconds
self.assertAlmostEqual(time_series[2].average_response_time, (200 + 220 + 240 + 260 + 280) / 5)
self.assertAlmostEqual(time_series[2].error_rate, 20) # 1 error out of 5 samples
def test_compare_with_benchmarks(self):
"""Test comparing metrics with benchmarks."""
# Calculate metrics
metrics = self.calculator.calculate_overall_metrics(self.test_results)
# Define benchmarks
benchmarks = {
"average_response_time": 150,
"error_rate": 0,
"throughput": 2
}
# Compare with benchmarks
comparison = self.calculator.compare_with_benchmarks(metrics, benchmarks)
# Check comparison results
self.assertIn("average_response_time", comparison)
self.assertIn("error_rate", comparison)
self.assertIn("throughput", comparison)
# Check average_response_time comparison
avg_rt_comp = comparison["average_response_time"]
self.assertEqual(avg_rt_comp["benchmark"], 150)
self.assertAlmostEqual(avg_rt_comp["actual"], metrics.average_response_time)
self.assertAlmostEqual(avg_rt_comp["difference"], metrics.average_response_time - 150)
self.assertAlmostEqual(avg_rt_comp["percent_difference"],
(metrics.average_response_time - 150) / 150 * 100)
# Check error_rate comparison
error_rate_comp = comparison["error_rate"]
self.assertEqual(error_rate_comp["benchmark"], 0)
self.assertAlmostEqual(error_rate_comp["actual"], metrics.error_rate)
self.assertAlmostEqual(error_rate_comp["difference"], metrics.error_rate)
self.assertEqual(error_rate_comp["percent_difference"], float('inf')) # Division by zero
# Check throughput comparison
throughput_comp = comparison["throughput"]
self.assertEqual(throughput_comp["benchmark"], 2)
self.assertAlmostEqual(throughput_comp["actual"], metrics.throughput)
self.assertAlmostEqual(throughput_comp["difference"], metrics.throughput - 2)
self.assertAlmostEqual(throughput_comp["percent_difference"],
(metrics.throughput - 2) / 2 * 100)
def test_empty_results(self):
"""Test calculating metrics for empty test results."""
empty_results = TestResults()
with self.assertRaises(ValueError):
self.calculator.calculate_overall_metrics(empty_results)
with self.assertRaises(ValueError):
self.calculator.calculate_endpoint_metrics(empty_results)
with self.assertRaises(ValueError):
self.calculator.calculate_time_series_metrics(empty_results)
def test_invalid_interval(self):
"""Test calculating time series metrics with invalid interval."""
with self.assertRaises(ValueError):
self.calculator.calculate_time_series_metrics(self.test_results, interval_seconds=0)
with self.assertRaises(ValueError):
self.calculator.calculate_time_series_metrics(self.test_results, interval_seconds=-1)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/jmeter_report.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html>
<head>
<title>JMeter Test Results Analysis</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
h1, h2, h3 { color: #333; }
table { border-collapse: collapse; width: 100%; margin-bottom: 20px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
tr:nth-child(even) { background-color: #f9f9f9; }
.chart { margin: 20px 0; max-width: 100%; }
.section { margin-bottom: 30px; }
.severity-high { color: #d9534f; }
.severity-medium { color: #f0ad4e; }
.severity-low { color: #5bc0de; }
</style>
</head>
<body>
<h1>JMeter Test Results Analysis</h1>
<div class="section">
<h2>Summary</h2>
<table>
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Total Samples</td><td>96</td></tr>
<tr><td>Error Count</td><td>0</td></tr>
<tr><td>Error Rate</td><td>0.00%</td></tr>
<tr><td>Average Response Time</td><td>222.18 ms</td></tr>
<tr><td>Median Response Time</td><td>209.00 ms</td></tr>
<tr><td>90th Percentile</td><td>335.50 ms</td></tr>
<tr><td>95th Percentile</td><td>345.25 ms</td></tr>
<tr><td>99th Percentile</td><td>356.00 ms</td></tr>
<tr><td>Min Response Time</td><td>105.00 ms</td></tr>
<tr><td>Max Response Time</td><td>356.00 ms</td></tr>
<tr><td>Throughput</td><td>0.01 requests/second</td></tr>
<tr><td>Start Time</td><td>2025-04-09 20:38:16.160000</td></tr>
<tr><td>End Time</td><td>2025-04-09 23:17:02.381000</td></tr>
<tr><td>Duration</td><td>9526.22 seconds</td></tr>
</table>
</div>
<div class="section">
<h2>Endpoint Analysis</h2>
<table>
<tr>
<th>Endpoint</th>
<th>Samples</th>
<th>Errors</th>
<th>Error Rate</th>
<th>Avg Response Time</th>
<th>95th Percentile</th>
<th>Throughput</th>
</tr>
<tr>
<td>Login as u1</td>
<td>8</td>
<td>0</td>
<td>0.00%</td>
<td>174.62 ms</td>
<td>271.10 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Action = none</td>
<td>16</td>
<td>0</td>
<td>0.00%</td>
<td>253.12 ms</td>
<td>350.25 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Action a</td>
<td>8</td>
<td>0</td>
<td>0.00%</td>
<td>235.25 ms</td>
<td>327.15 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Action b</td>
<td>8</td>
<td>0</td>
<td>0.00%</td>
<td>224.00 ms</td>
<td>317.00 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Action c</td>
<td>8</td>
<td>0</td>
<td>0.00%</td>
<td>231.00 ms</td>
<td>349.35 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Action d</td>
<td>8</td>
<td>0</td>
<td>0.00%</td>
<td>215.50 ms</td>
<td>270.65 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Logout</td>
<td>16</td>
<td>0</td>
<td>0.00%</td>
<td>214.56 ms</td>
<td>356.00 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Action = <EOF></td>
<td>16</td>
<td>0</td>
<td>0.00%</td>
<td>224.12 ms</td>
<td>341.00 ms</td>
<td>0.00 req/s</td>
</tr>
<tr>
<td>Login as u2</td>
<td>8</td>
<td>0</td>
<td>0.00%</td>
<td>202.12 ms</td>
<td>297.30 ms</td>
<td>0.00 req/s</td>
</tr>
</table>
</div>
<div class="section">
<h2>Bottleneck Analysis</h2>
<h3>Slow Endpoints</h3>
<table>
<tr>
<th>Endpoint</th>
<th>Response Time</th>
<th>Threshold</th>
<th>Severity</th>
</tr>
<tr>
<td>Logout</td>
<td>356.00 ms</td>
<td>329.05 ms</td>
<td class="severity-low">LOW</td>
</tr>
<tr>
<td>Action = none</td>
<td>350.25 ms</td>
<td>329.05 ms</td>
<td class="severity-low">LOW</td>
</tr>
<tr>
<td>Action c</td>
<td>349.35 ms</td>
<td>329.05 ms</td>
<td class="severity-low">LOW</td>
</tr>
<tr>
<td>Action = <EOF></td>
<td>341.00 ms</td>
<td>329.05 ms</td>
<td class="severity-low">LOW</td>
</tr>
</table>
</div>
<div class="section">
<h2>Insights and Recommendations</h2>
<h3>Scaling Insights</h3>
<table>
<tr>
<th>Topic</th>
<th>Description</th>
</tr>
<tr>
<td>No Correlation with Concurrency</td>
<td>There is little to no correlation between the number of concurrent users and response times, suggesting good scalability</td>
</tr>
<tr>
<td>No Performance Degradation Detected</td>
<td>No significant performance degradation was detected with increasing concurrent users within the tested range</td>
</tr>
</table>
</div>
</body>
</html>
```
--------------------------------------------------------------------------------
/windsurf_db_reader_alternative.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Windsurf LevelDB Reader - Alternative Implementation
A tool to read and explore Windsurf's local storage database using pure Python.
"""
import os
import json
import sys
import struct
from pathlib import Path
class SimpleLevelDBReader:
"""
A simple LevelDB reader that can extract basic data without full LevelDB library.
This is a simplified approach that may not work for all LevelDB files but can
extract readable data from many cases.
"""
def __init__(self, db_path=None):
if db_path is None:
# Default path for Windsurf Next on macOS
home = Path.home()
db_path = home / "Library/Application Support/Windsurf - Next/Local Storage/leveldb"
self.db_path = Path(db_path)
def read_ldb_files(self):
"""Read .ldb files and try to extract readable data"""
if not self.db_path.exists():
print(f"❌ Database path does not exist: {self.db_path}")
return []
ldb_files = list(self.db_path.glob("*.ldb"))
if not ldb_files:
print("❌ No .ldb files found")
return []
print(f"📁 Found {len(ldb_files)} .ldb files")
all_data = []
for ldb_file in ldb_files:
print(f"📖 Reading {ldb_file.name}...")
data = self._extract_strings_from_ldb(ldb_file)
all_data.extend(data)
return all_data
def _extract_strings_from_ldb(self, file_path):
"""Extract readable strings from an LDB file"""
extracted_data = []
try:
with open(file_path, 'rb') as f:
content = f.read()
# Look for JSON-like structures and readable strings
current_string = ""
in_string = False
for i, byte in enumerate(content):
char = chr(byte) if 32 <= byte <= 126 else None # Printable ASCII
if char:
current_string += char
in_string = True
else:
if in_string and len(current_string) > 10: # Only keep longer strings
# Check if it looks like JSON or contains useful data
if (current_string.startswith('{') or
current_string.startswith('[') or
'memory' in current_string.lower() or
'conversation' in current_string.lower() or
'windsurf' in current_string.lower()):
extracted_data.append({
'file': file_path.name,
'offset': i - len(current_string),
'content': current_string,
'type': self._guess_content_type(current_string)
})
current_string = ""
in_string = False
# Don't forget the last string
if in_string and len(current_string) > 10:
extracted_data.append({
'file': file_path.name,
'offset': len(content) - len(current_string),
'content': current_string,
'type': self._guess_content_type(current_string)
})
except Exception as e:
print(f"❌ Error reading {file_path}: {e}")
return extracted_data
def _guess_content_type(self, content):
"""Guess the type of content"""
content_lower = content.lower()
if content.startswith('{') and content.endswith('}'):
return 'json_object'
elif content.startswith('[') and content.endswith(']'):
return 'json_array'
elif 'memory' in content_lower:
return 'memory_related'
elif 'conversation' in content_lower:
return 'conversation_related'
elif any(keyword in content_lower for keyword in ['windsurf', 'cascade', 'user', 'assistant']):
return 'windsurf_related'
else:
return 'text'
def search_data(self, data, pattern):
"""Search extracted data for a pattern"""
results = []
pattern_lower = pattern.lower()
for item in data:
if pattern_lower in item['content'].lower():
results.append(item)
return results
def export_data(self, data, output_file="windsurf_extracted_data.json"):
"""Export extracted data to JSON"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"✅ Exported {len(data)} items to {output_file}")
except Exception as e:
print(f"❌ Error exporting data: {e}")
def analyze_data(self, data):
"""Analyze the extracted data"""
if not data:
print("❌ No data to analyze")
return
print(f"\n📊 Analysis of {len(data)} extracted items:")
print("-" * 50)
# Count by type
type_counts = {}
for item in data:
item_type = item['type']
type_counts[item_type] = type_counts.get(item_type, 0) + 1
print("📈 Content types:")
for content_type, count in sorted(type_counts.items()):
print(f" {content_type}: {count}")
# Count by file
file_counts = {}
for item in data:
file_name = item['file']
file_counts[file_name] = file_counts.get(file_name, 0) + 1
print(f"\n📁 Items per file:")
for file_name, count in sorted(file_counts.items()):
print(f" {file_name}: {count}")
# Show some examples
print(f"\n🔍 Sample content:")
for i, item in enumerate(data[:5]): # Show first 5 items
content_preview = item['content'][:100] + "..." if len(item['content']) > 100 else item['content']
print(f" {i+1}. [{item['type']}] {content_preview}")
def main():
"""Main function with interactive menu"""
reader = SimpleLevelDBReader()
print("🌊 Windsurf Database Reader (Alternative)")
print("=" * 60)
print("This tool extracts readable strings from LevelDB files.")
print("It may not capture all data but can find JSON and text content.")
print("=" * 60)
try:
while True:
print("\nOptions:")
print("1. Extract all readable data")
print("2. Search for specific pattern")
print("3. Analyze extracted data")
print("4. Export data to JSON")
print("0. Exit")
print("-" * 40)
choice = input("Enter your choice (0-4): ").strip()
if choice == '0':
break
elif choice == '1':
print("🔄 Extracting data from LevelDB files...")
data = reader.read_ldb_files()
if data:
reader.analyze_data(data)
# Store data for other operations
globals()['extracted_data'] = data
else:
print("❌ No readable data found")
elif choice == '2':
if 'extracted_data' not in globals():
print("❌ Please extract data first (option 1)")
continue
pattern = input("Enter search pattern: ").strip()
if pattern:
results = reader.search_data(globals()['extracted_data'], pattern)
print(f"\n🔍 Found {len(results)} matches for '{pattern}':")
for i, item in enumerate(results[:10]): # Show first 10 matches
content_preview = item['content'][:200] + "..." if len(item['content']) > 200 else item['content']
print(f"\n{i+1}. File: {item['file']}, Type: {item['type']}")
print(f"Content: {content_preview}")
print("-" * 40)
elif choice == '3':
if 'extracted_data' not in globals():
print("❌ Please extract data first (option 1)")
continue
reader.analyze_data(globals()['extracted_data'])
elif choice == '4':
if 'extracted_data' not in globals():
print("❌ Please extract data first (option 1)")
continue
filename = input("Enter output filename (default: windsurf_extracted_data.json): ").strip()
if not filename:
filename = "windsurf_extracted_data.json"
reader.export_data(globals()['extracted_data'], filename)
else:
print("❌ Invalid choice. Please try again.")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/analyzer/metrics/calculator.py:
--------------------------------------------------------------------------------
```python
"""
Metrics calculator for JMeter test results.
This module provides functionality for calculating performance metrics
from JMeter test results, including overall metrics, endpoint-specific metrics,
and time series metrics.
"""
import math
import statistics
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from analyzer.models import (EndpointMetrics, OverallMetrics, Sample,
TestResults, TimeSeriesMetrics)
class MetricsCalculator:
"""Calculator for performance metrics from test results."""
def calculate_overall_metrics(self, test_results: TestResults) -> OverallMetrics:
"""Calculate overall metrics for the entire test.
Args:
test_results: TestResults object containing samples
Returns:
OverallMetrics object with calculated metrics
Raises:
ValueError: If test_results contains no samples
"""
if not test_results.samples:
raise ValueError("Cannot calculate metrics for empty test results")
# Extract response times and success status
response_times = [sample.response_time for sample in test_results.samples]
success_count = sum(1 for sample in test_results.samples if sample.success)
error_count = len(test_results.samples) - success_count
# Calculate duration
if test_results.start_time and test_results.end_time:
duration = (test_results.end_time - test_results.start_time).total_seconds()
else:
duration = 0
# Calculate throughput (requests per second)
throughput = len(test_results.samples) / duration if duration > 0 else 0
# Calculate percentiles
response_times_sorted = sorted(response_times)
# Create metrics object
metrics = OverallMetrics(
total_samples=len(test_results.samples),
error_count=error_count,
error_rate=(error_count / len(test_results.samples)) * 100 if test_results.samples else 0,
average_response_time=statistics.mean(response_times) if response_times else 0,
median_response_time=statistics.median(response_times) if response_times else 0,
percentile_90=self._calculate_percentile(response_times_sorted, 90),
percentile_95=self._calculate_percentile(response_times_sorted, 95),
percentile_99=self._calculate_percentile(response_times_sorted, 99),
min_response_time=min(response_times) if response_times else 0,
max_response_time=max(response_times) if response_times else 0,
throughput=throughput,
test_duration=duration
)
return metrics
def calculate_endpoint_metrics(self, test_results: TestResults) -> Dict[str, EndpointMetrics]:
"""Calculate metrics broken down by endpoint/sampler.
Args:
test_results: TestResults object containing samples
Returns:
Dictionary mapping endpoint names to EndpointMetrics objects
Raises:
ValueError: If test_results contains no samples
"""
if not test_results.samples:
raise ValueError("Cannot calculate metrics for empty test results")
# Group samples by endpoint
endpoints = defaultdict(list)
for sample in test_results.samples:
endpoints[sample.label].append(sample)
# Calculate metrics for each endpoint
endpoint_metrics = {}
for endpoint, samples in endpoints.items():
# Create a temporary TestResults object with only samples for this endpoint
temp_results = TestResults()
for sample in samples:
temp_results.add_sample(sample)
# Calculate overall metrics for this endpoint
overall_metrics = self.calculate_overall_metrics(temp_results)
# Create endpoint metrics
metrics = EndpointMetrics(
endpoint=endpoint,
total_samples=overall_metrics.total_samples,
error_count=overall_metrics.error_count,
error_rate=overall_metrics.error_rate,
average_response_time=overall_metrics.average_response_time,
median_response_time=overall_metrics.median_response_time,
percentile_90=overall_metrics.percentile_90,
percentile_95=overall_metrics.percentile_95,
percentile_99=overall_metrics.percentile_99,
min_response_time=overall_metrics.min_response_time,
max_response_time=overall_metrics.max_response_time,
throughput=overall_metrics.throughput,
test_duration=overall_metrics.test_duration
)
endpoint_metrics[endpoint] = metrics
return endpoint_metrics
def calculate_time_series_metrics(self, test_results: TestResults,
interval_seconds: int = 5) -> List[TimeSeriesMetrics]:
"""Calculate metrics over time using the specified interval.
Args:
test_results: TestResults object containing samples
interval_seconds: Time interval in seconds (default: 5)
Returns:
List of TimeSeriesMetrics objects, one for each interval
Raises:
ValueError: If test_results contains no samples or if interval_seconds <= 0
"""
if not test_results.samples:
raise ValueError("Cannot calculate metrics for empty test results")
if interval_seconds <= 0:
raise ValueError("Interval must be positive")
if not test_results.start_time or not test_results.end_time:
raise ValueError("Test results must have start and end times")
# Create time intervals
start_time = test_results.start_time
end_time = test_results.end_time
# Ensure we have at least one interval
if (end_time - start_time).total_seconds() < interval_seconds:
end_time = start_time + timedelta(seconds=interval_seconds)
intervals = []
current_time = start_time
while current_time < end_time:
next_time = current_time + timedelta(seconds=interval_seconds)
intervals.append((current_time, next_time))
current_time = next_time
# Group samples by interval
interval_samples = [[] for _ in range(len(intervals))]
for sample in test_results.samples:
for i, (start, end) in enumerate(intervals):
if start <= sample.timestamp < end:
interval_samples[i].append(sample)
break
# Calculate metrics for each interval
time_series_metrics = []
for i, (start, end) in enumerate(intervals):
samples = interval_samples[i]
# Skip intervals with no samples
if not samples:
continue
# Calculate metrics for this interval
response_times = [sample.response_time for sample in samples]
error_count = sum(1 for sample in samples if not sample.success)
# Count active threads (approximation based on unique thread names)
thread_names = set(sample.thread_name for sample in samples if sample.thread_name)
active_threads = len(thread_names)
# Calculate throughput for this interval
interval_duration = (end - start).total_seconds()
throughput = len(samples) / interval_duration if interval_duration > 0 else 0
# Create metrics object
metrics = TimeSeriesMetrics(
timestamp=start,
active_threads=active_threads,
throughput=throughput,
average_response_time=statistics.mean(response_times) if response_times else 0,
error_rate=(error_count / len(samples)) * 100 if samples else 0
)
time_series_metrics.append(metrics)
return time_series_metrics
def compare_with_benchmarks(self, metrics: OverallMetrics,
benchmarks: Dict[str, float]) -> Dict[str, Dict[str, float]]:
"""Compare metrics with benchmarks.
Args:
metrics: OverallMetrics object
benchmarks: Dictionary mapping metric names to benchmark values
Returns:
Dictionary with comparison results
"""
comparison = {}
for metric_name, benchmark_value in benchmarks.items():
if hasattr(metrics, metric_name):
actual_value = getattr(metrics, metric_name)
difference = actual_value - benchmark_value
percent_difference = (difference / benchmark_value) * 100 if benchmark_value != 0 else float('inf')
comparison[metric_name] = {
'benchmark': benchmark_value,
'actual': actual_value,
'difference': difference,
'percent_difference': percent_difference
}
return comparison
def _calculate_percentile(self, sorted_values: List[float], percentile: float) -> float:
"""Calculate a percentile from sorted values.
Args:
sorted_values: List of values, sorted in ascending order
percentile: Percentile to calculate (0-100)
Returns:
Percentile value
"""
if not sorted_values:
return 0
# Calculate percentile index
index = (percentile / 100) * (len(sorted_values) - 1)
# If index is an integer, return the value at that index
if index.is_integer():
return sorted_values[int(index)]
# Otherwise, interpolate between the two nearest values
lower_index = math.floor(index)
upper_index = math.ceil(index)
lower_value = sorted_values[lower_index]
upper_value = sorted_values[upper_index]
fraction = index - lower_index
return lower_value + (upper_value - lower_value) * fraction
```
--------------------------------------------------------------------------------
/analyzer/bottleneck/analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Bottleneck analyzer for JMeter test results.
This module provides functionality for identifying performance bottlenecks
in JMeter test results, including slow endpoints, error-prone endpoints,
response time anomalies, and concurrency impact analysis.
"""
import statistics
from typing import Dict, List, Optional, Tuple
from analyzer.models import (Anomaly, Bottleneck, EndpointMetrics,
OverallMetrics, Sample, TestResults,
TimeSeriesMetrics)
class BottleneckAnalyzer:
"""Analyzer for identifying performance bottlenecks."""
def identify_slow_endpoints(self, endpoint_metrics: Dict[str, EndpointMetrics],
threshold_percentile: float = 95,
threshold_factor: float = 1.5) -> List[Bottleneck]:
"""Identify endpoints with the highest response times.
Args:
endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects
threshold_percentile: Percentile to use for response time threshold (default: 95)
threshold_factor: Factor to multiply the average response time by (default: 1.5)
Returns:
List of Bottleneck objects for slow endpoints
"""
if not endpoint_metrics:
return []
# Calculate average response time across all endpoints
avg_response_times = [metrics.average_response_time for metrics in endpoint_metrics.values()]
overall_avg_response_time = statistics.mean(avg_response_times) if avg_response_times else 0
# Calculate threshold
threshold = overall_avg_response_time * threshold_factor
# Identify slow endpoints
bottlenecks = []
for endpoint, metrics in endpoint_metrics.items():
# Get the response time at the specified percentile
percentile_rt = getattr(metrics, f"percentile_{int(threshold_percentile)}", metrics.average_response_time)
# Check if the endpoint is slow
if percentile_rt > threshold:
# Determine severity based on how much it exceeds the threshold
if percentile_rt > threshold * 2:
severity = "high"
elif percentile_rt > threshold * 1.5:
severity = "medium"
else:
severity = "low"
bottleneck = Bottleneck(
endpoint=endpoint,
metric_type="response_time",
value=percentile_rt,
threshold=threshold,
severity=severity
)
bottlenecks.append(bottleneck)
# Sort bottlenecks by severity and then by value (descending)
severity_order = {"high": 0, "medium": 1, "low": 2}
bottlenecks.sort(key=lambda b: (severity_order.get(b.severity, 3), -b.value))
return bottlenecks
def identify_error_prone_endpoints(self, endpoint_metrics: Dict[str, EndpointMetrics],
threshold_error_rate: float = 1.0) -> List[Bottleneck]:
"""Identify endpoints with the highest error rates.
Args:
endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects
threshold_error_rate: Minimum error rate to consider as a bottleneck (default: 1.0%)
Returns:
List of Bottleneck objects for error-prone endpoints
"""
if not endpoint_metrics:
return []
# Identify error-prone endpoints
bottlenecks = []
for endpoint, metrics in endpoint_metrics.items():
# Skip endpoints with no errors
if metrics.error_count == 0:
continue
# Check if the endpoint has a high error rate
if metrics.error_rate >= threshold_error_rate:
# Determine severity based on error rate
if metrics.error_rate >= 10.0:
severity = "high"
elif metrics.error_rate >= 5.0:
severity = "medium"
else:
severity = "low"
bottleneck = Bottleneck(
endpoint=endpoint,
metric_type="error_rate",
value=metrics.error_rate,
threshold=threshold_error_rate,
severity=severity
)
bottlenecks.append(bottleneck)
# Sort bottlenecks by severity and then by value (descending)
severity_order = {"high": 0, "medium": 1, "low": 2}
bottlenecks.sort(key=lambda b: (severity_order.get(b.severity, 3), -b.value))
return bottlenecks
def detect_anomalies(self, time_series_metrics: List[TimeSeriesMetrics],
z_score_threshold: float = 2.0) -> List[Anomaly]:
"""Detect response time anomalies and outliers.
Args:
time_series_metrics: List of TimeSeriesMetrics objects
z_score_threshold: Z-score threshold for anomaly detection (default: 2.0)
Returns:
List of Anomaly objects
"""
if not time_series_metrics:
return []
# Extract response times
response_times = [metrics.average_response_time for metrics in time_series_metrics]
# Calculate mean and standard deviation
mean_rt = statistics.mean(response_times)
stdev_rt = statistics.stdev(response_times) if len(response_times) > 1 else 0
# Detect anomalies
anomalies = []
for metrics in time_series_metrics:
# Skip if standard deviation is zero (all values are the same)
if stdev_rt == 0:
continue
# Calculate z-score
z_score = (metrics.average_response_time - mean_rt) / stdev_rt
# Check if the response time is an anomaly
if abs(z_score) >= z_score_threshold:
# Calculate deviation percentage
deviation_percentage = ((metrics.average_response_time - mean_rt) / mean_rt) * 100
anomaly = Anomaly(
timestamp=metrics.timestamp,
endpoint="overall", # Overall anomaly, not endpoint-specific
expected_value=mean_rt,
actual_value=metrics.average_response_time,
deviation_percentage=deviation_percentage
)
anomalies.append(anomaly)
# Sort anomalies by deviation percentage (descending)
anomalies.sort(key=lambda a: abs(a.deviation_percentage), reverse=True)
return anomalies
def analyze_concurrency_impact(self, time_series_metrics: List[TimeSeriesMetrics]) -> Dict:
"""Analyze the impact of concurrency on performance.
Args:
time_series_metrics: List of TimeSeriesMetrics objects
Returns:
Dictionary containing concurrency analysis results
"""
if not time_series_metrics:
return {"correlation": 0, "degradation_threshold": 0, "has_degradation": False}
# Extract thread counts and response times
thread_counts = [metrics.active_threads for metrics in time_series_metrics]
response_times = [metrics.average_response_time for metrics in time_series_metrics]
# Skip if there's no variation in thread counts
if len(set(thread_counts)) <= 1:
return {"correlation": 0, "degradation_threshold": 0, "has_degradation": False}
# Calculate correlation between thread count and response time
try:
correlation = self._calculate_correlation(thread_counts, response_times)
except (ValueError, ZeroDivisionError):
correlation = 0
# Identify potential degradation threshold
degradation_threshold = 0
has_degradation = False
if correlation > 0.5: # Strong positive correlation
# Group by thread count
thread_rt_map = {}
for metrics in time_series_metrics:
if metrics.active_threads not in thread_rt_map:
thread_rt_map[metrics.active_threads] = []
thread_rt_map[metrics.active_threads].append(metrics.average_response_time)
# Calculate average response time for each thread count
thread_avg_rt = {
threads: statistics.mean(rts)
for threads, rts in thread_rt_map.items()
}
# Sort by thread count
sorted_threads = sorted(thread_avg_rt.keys())
# Look for significant increases in response time
for i in range(1, len(sorted_threads)):
prev_threads = sorted_threads[i-1]
curr_threads = sorted_threads[i]
prev_rt = thread_avg_rt[prev_threads]
curr_rt = thread_avg_rt[curr_threads]
# Check if response time increased by more than 50%
if curr_rt > prev_rt * 1.5:
degradation_threshold = curr_threads
has_degradation = True
break
return {
"correlation": correlation,
"degradation_threshold": degradation_threshold,
"has_degradation": has_degradation
}
def _calculate_correlation(self, x: List[float], y: List[float]) -> float:
"""Calculate Pearson correlation coefficient between two lists.
Args:
x: First list of values
y: Second list of values
Returns:
Correlation coefficient (-1 to 1)
"""
if len(x) != len(y) or len(x) < 2:
return 0
# Calculate means
mean_x = statistics.mean(x)
mean_y = statistics.mean(y)
# Calculate numerator and denominators
numerator = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y))
denom_x = sum((xi - mean_x) ** 2 for xi in x)
denom_y = sum((yi - mean_y) ** 2 for yi in y)
# Calculate correlation
if denom_x == 0 or denom_y == 0:
return 0
return numerator / ((denom_x ** 0.5) * (denom_y ** 0.5))
```
--------------------------------------------------------------------------------
/tests/test_visualization_engine.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the visualization engine.
"""
import os
import tempfile
import unittest
from datetime import datetime, timedelta
from analyzer.models import EndpointMetrics, TimeSeriesMetrics
from analyzer.visualization.engine import VisualizationEngine
class TestVisualizationEngine(unittest.TestCase):
"""Tests for the VisualizationEngine class."""
def setUp(self):
"""Set up test fixtures."""
# Create a temporary directory for output files
self.temp_dir = tempfile.mkdtemp()
self.engine = VisualizationEngine(output_dir=self.temp_dir)
# Create time series metrics
base_time = datetime(2023, 1, 1, 12, 0, 0)
self.time_series_metrics = [
TimeSeriesMetrics(
timestamp=base_time + timedelta(seconds=i * 5),
active_threads=i + 1,
throughput=10.0 + i * 0.5,
average_response_time=100.0 + i * 20,
error_rate=0.0 if i < 8 else 5.0
)
for i in range(10)
]
# Create endpoint metrics
self.endpoint_metrics = {
"Endpoint 1": EndpointMetrics(
endpoint="Endpoint 1",
total_samples=100,
error_count=0,
error_rate=0.0,
average_response_time=100.0,
median_response_time=95.0,
percentile_90=150.0,
percentile_95=180.0,
percentile_99=200.0,
min_response_time=50.0,
max_response_time=250.0,
throughput=10.0,
test_duration=10.0
),
"Endpoint 2": EndpointMetrics(
endpoint="Endpoint 2",
total_samples=100,
error_count=5,
error_rate=5.0,
average_response_time=200.0,
median_response_time=190.0,
percentile_90=300.0,
percentile_95=350.0,
percentile_99=400.0,
min_response_time=100.0,
max_response_time=450.0,
throughput=8.0,
test_duration=10.0
),
"Endpoint 3": EndpointMetrics(
endpoint="Endpoint 3",
total_samples=100,
error_count=10,
error_rate=10.0,
average_response_time=300.0,
median_response_time=280.0,
percentile_90=450.0,
percentile_95=500.0,
percentile_99=600.0,
min_response_time=150.0,
max_response_time=700.0,
throughput=5.0,
test_duration=10.0
)
}
# Create response times
self.response_times = [100, 120, 130, 140, 150, 160, 170, 180, 190, 200,
220, 240, 260, 280, 300, 350, 400, 450, 500, 600]
# Create analysis results
self.analysis_results = {
"summary": {
"total_samples": 300,
"error_count": 15,
"error_rate": 5.0,
"average_response_time": 200.0,
"median_response_time": 180.0,
"percentile_90": 350.0,
"percentile_95": 400.0,
"percentile_99": 500.0,
"min_response_time": 100.0,
"max_response_time": 600.0,
"throughput": 7.5,
"start_time": datetime(2023, 1, 1, 12, 0, 0),
"end_time": datetime(2023, 1, 1, 12, 0, 40),
"duration": 40.0
},
"detailed": {
"endpoints": {
"Endpoint 1": {
"total_samples": 100,
"error_count": 0,
"error_rate": 0.0,
"average_response_time": 100.0,
"median_response_time": 95.0,
"percentile_90": 150.0,
"percentile_95": 180.0,
"percentile_99": 200.0,
"min_response_time": 50.0,
"max_response_time": 250.0,
"throughput": 10.0
},
"Endpoint 2": {
"total_samples": 100,
"error_count": 5,
"error_rate": 5.0,
"average_response_time": 200.0,
"median_response_time": 190.0,
"percentile_90": 300.0,
"percentile_95": 350.0,
"percentile_99": 400.0,
"min_response_time": 100.0,
"max_response_time": 450.0,
"throughput": 8.0
},
"Endpoint 3": {
"total_samples": 100,
"error_count": 10,
"error_rate": 10.0,
"average_response_time": 300.0,
"median_response_time": 280.0,
"percentile_90": 450.0,
"percentile_95": 500.0,
"percentile_99": 600.0,
"min_response_time": 150.0,
"max_response_time": 700.0,
"throughput": 5.0
}
},
"bottlenecks": {
"slow_endpoints": [
{
"endpoint": "Endpoint 3",
"response_time": 300.0,
"threshold": 200.0,
"severity": "high"
}
],
"error_prone_endpoints": [
{
"endpoint": "Endpoint 3",
"error_rate": 10.0,
"threshold": 5.0,
"severity": "medium"
}
]
},
"insights": {
"recommendations": [
{
"issue": "High response time in Endpoint 3",
"recommendation": "Optimize database queries",
"expected_impact": "Reduced response time",
"implementation_difficulty": "medium",
"priority_level": "high"
}
],
"scaling_insights": [
{
"topic": "Concurrency Impact",
"description": "Performance degrades with increasing concurrency"
}
]
}
}
}
def tearDown(self):
"""Tear down test fixtures."""
# Clean up temporary directory
for file in os.listdir(self.temp_dir):
os.remove(os.path.join(self.temp_dir, file))
os.rmdir(self.temp_dir)
def test_create_time_series_graph(self):
"""Test creating a time series graph."""
# Test with default parameters
graph = self.engine.create_time_series_graph(self.time_series_metrics)
self.assertIsNotNone(graph)
self.assertEqual(graph["type"], "time_series")
# Test with output file
output_file = "time_series.txt"
output_path = self.engine.create_time_series_graph(
self.time_series_metrics, output_file=output_file)
self.assertTrue(os.path.exists(output_path))
# Test with different metric
graph = self.engine.create_time_series_graph(
self.time_series_metrics, metric_name="throughput")
self.assertIsNotNone(graph)
self.assertEqual(graph["y_label"], "Throughput (requests/second)")
# Test with empty metrics
with self.assertRaises(ValueError):
self.engine.create_time_series_graph([])
# Test with invalid metric name
with self.assertRaises(ValueError):
self.engine.create_time_series_graph(
self.time_series_metrics, metric_name="invalid_metric")
def test_create_distribution_graph(self):
"""Test creating a distribution graph."""
# Test with default parameters
graph = self.engine.create_distribution_graph(self.response_times)
self.assertIsNotNone(graph)
self.assertEqual(graph["type"], "distribution")
# Test with output file
output_file = "distribution.txt"
output_path = self.engine.create_distribution_graph(
self.response_times, output_file=output_file)
self.assertTrue(os.path.exists(output_path))
# Test with custom percentiles
graph = self.engine.create_distribution_graph(
self.response_times, percentiles=[25, 50, 75])
self.assertIsNotNone(graph)
self.assertIn(25, graph["percentiles"])
self.assertIn(50, graph["percentiles"])
self.assertIn(75, graph["percentiles"])
# Test with empty response times
with self.assertRaises(ValueError):
self.engine.create_distribution_graph([])
def test_create_endpoint_comparison_chart(self):
"""Test creating an endpoint comparison chart."""
# Test with default parameters
chart = self.engine.create_endpoint_comparison_chart(self.endpoint_metrics)
self.assertIsNotNone(chart)
self.assertEqual(chart["type"], "endpoint_comparison")
# Test with output file
output_file = "comparison.txt"
output_path = self.engine.create_endpoint_comparison_chart(
self.endpoint_metrics, output_file=output_file)
self.assertTrue(os.path.exists(output_path))
# Test with different metric
chart = self.engine.create_endpoint_comparison_chart(
self.endpoint_metrics, metric_name="error_rate")
self.assertIsNotNone(chart)
self.assertEqual(chart["x_label"], "Error Rate (%)")
# Test with empty metrics
with self.assertRaises(ValueError):
self.engine.create_endpoint_comparison_chart({})
# Test with invalid metric name
with self.assertRaises(ValueError):
self.engine.create_endpoint_comparison_chart(
self.endpoint_metrics, metric_name="invalid_metric")
def test_create_html_report(self):
"""Test creating an HTML report."""
output_file = "report.html"
output_path = self.engine.create_html_report(
self.analysis_results, output_file=output_file)
# Check that the file exists
self.assertTrue(os.path.exists(output_path))
# Check that the file contains expected content
with open(output_path, 'r') as f:
content = f.read()
self.assertIn("JMeter Test Results Analysis", content)
self.assertIn("Endpoint Analysis", content)
self.assertIn("Bottleneck Analysis", content)
self.assertIn("Insights and Recommendations", content)
def test_figure_to_base64(self):
"""Test converting a figure to base64."""
graph = self.engine.create_time_series_graph(self.time_series_metrics)
base64_str = self.engine.figure_to_base64(graph)
# Check that the result is a non-empty string
self.assertIsInstance(base64_str, str)
self.assertTrue(len(base64_str) > 0)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/analyzer/analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Main analyzer module for JMeter test results.
This module provides the main entry point for analyzing JMeter test results.
It orchestrates the flow of data through the various components of the analyzer.
"""
from pathlib import Path
from datetime import timedelta
from typing import Dict, List, Optional, Union
from analyzer.models import TestResults, OverallMetrics, Bottleneck
from analyzer.parser.base import JTLParser
from analyzer.parser.xml_parser import XMLJTLParser
from analyzer.parser.csv_parser import CSVJTLParser
from analyzer.metrics.calculator import MetricsCalculator
from analyzer.bottleneck.analyzer import BottleneckAnalyzer
from analyzer.insights.generator import InsightsGenerator
class TestResultsAnalyzer:
"""Main analyzer class for JMeter test results."""
def __init__(self):
"""Initialize the analyzer."""
self.parsers = {}
self.metrics_calculator = MetricsCalculator()
self.bottleneck_analyzer = BottleneckAnalyzer()
self.insights_generator = InsightsGenerator()
# Register default parsers
self.register_parser('xml', XMLJTLParser())
self.register_parser('csv', CSVJTLParser())
def register_parser(self, format_name: str, parser: JTLParser) -> None:
"""Register a parser for a specific format.
Args:
format_name: Name of the format (e.g., 'xml', 'csv')
parser: Parser instance
"""
self.parsers[format_name] = parser
def analyze_file(self, file_path: Union[str, Path],
detailed: bool = False) -> Dict:
"""Analyze a JTL file and return the results.
Args:
file_path: Path to the JTL file
detailed: Whether to include detailed analysis
Returns:
Dictionary containing analysis results
Raises:
FileNotFoundError: If the file does not exist
ValueError: If the file format is invalid or unsupported
"""
path = Path(file_path)
# Validate file
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Detect format
format_name = JTLParser.detect_format(path)
# Get appropriate parser
if format_name not in self.parsers:
raise ValueError(f"No parser available for format: {format_name}")
parser = self.parsers[format_name]
# Parse file
test_results = parser.parse_file(path)
# Perform analysis
analysis_results = self._analyze_results(test_results, detailed)
return analysis_results
def _analyze_results(self, test_results: TestResults,
detailed: bool = False) -> Dict:
"""Analyze test results and return the analysis.
Args:
test_results: TestResults object
detailed: Whether to include detailed analysis
Returns:
Dictionary containing analysis results
"""
# Calculate overall metrics
overall_metrics = self.metrics_calculator.calculate_overall_metrics(test_results)
# Create basic results structure
results = {
"summary": {
"total_samples": overall_metrics.total_samples,
"error_count": overall_metrics.error_count,
"error_rate": overall_metrics.error_rate,
"average_response_time": overall_metrics.average_response_time,
"median_response_time": overall_metrics.median_response_time,
"percentile_90": overall_metrics.percentile_90,
"percentile_95": overall_metrics.percentile_95,
"percentile_99": overall_metrics.percentile_99,
"min_response_time": overall_metrics.min_response_time,
"max_response_time": overall_metrics.max_response_time,
"throughput": overall_metrics.throughput,
"start_time": test_results.start_time,
"end_time": test_results.end_time,
"duration": overall_metrics.test_duration
}
}
# Add detailed analysis if requested
if detailed:
# Calculate endpoint metrics
endpoint_metrics = self.metrics_calculator.calculate_endpoint_metrics(test_results)
# Calculate time series metrics (5-second intervals)
try:
time_series_metrics = self.metrics_calculator.calculate_time_series_metrics(
test_results, interval_seconds=5)
except ValueError:
time_series_metrics = []
# Identify bottlenecks
slow_endpoints = self.bottleneck_analyzer.identify_slow_endpoints(endpoint_metrics)
error_prone_endpoints = self.bottleneck_analyzer.identify_error_prone_endpoints(endpoint_metrics)
anomalies = self.bottleneck_analyzer.detect_anomalies(time_series_metrics)
concurrency_impact = self.bottleneck_analyzer.analyze_concurrency_impact(time_series_metrics)
# Generate insights and recommendations
all_bottlenecks = slow_endpoints + error_prone_endpoints
bottleneck_recommendations = self.insights_generator.generate_bottleneck_recommendations(all_bottlenecks)
# Create error analysis
error_analysis = self._create_error_analysis(test_results)
error_recommendations = self.insights_generator.generate_error_recommendations(error_analysis)
# Generate scaling insights
scaling_insights = self.insights_generator.generate_scaling_insights(concurrency_impact)
# Prioritize all recommendations
all_recommendations = bottleneck_recommendations + error_recommendations
prioritized_recommendations = self.insights_generator.prioritize_recommendations(all_recommendations)
# Add to results
results["detailed"] = {
"samples_count": len(test_results.samples),
"endpoints": {
endpoint: {
"total_samples": metrics.total_samples,
"error_count": metrics.error_count,
"error_rate": metrics.error_rate,
"average_response_time": metrics.average_response_time,
"median_response_time": metrics.median_response_time,
"percentile_90": metrics.percentile_90,
"percentile_95": metrics.percentile_95,
"percentile_99": metrics.percentile_99,
"min_response_time": metrics.min_response_time,
"max_response_time": metrics.max_response_time,
"throughput": metrics.throughput
}
for endpoint, metrics in endpoint_metrics.items()
},
"time_series": [
{
"timestamp": metrics.timestamp.isoformat(),
"active_threads": metrics.active_threads,
"throughput": metrics.throughput,
"average_response_time": metrics.average_response_time,
"error_rate": metrics.error_rate
}
for metrics in time_series_metrics
],
"bottlenecks": {
"slow_endpoints": [
{
"endpoint": bottleneck.endpoint,
"response_time": bottleneck.value,
"threshold": bottleneck.threshold,
"severity": bottleneck.severity
}
for bottleneck in slow_endpoints
],
"error_prone_endpoints": [
{
"endpoint": bottleneck.endpoint,
"error_rate": bottleneck.value,
"threshold": bottleneck.threshold,
"severity": bottleneck.severity
}
for bottleneck in error_prone_endpoints
],
"anomalies": [
{
"timestamp": anomaly.timestamp.isoformat(),
"expected_value": anomaly.expected_value,
"actual_value": anomaly.actual_value,
"deviation_percentage": anomaly.deviation_percentage
}
for anomaly in anomalies
],
"concurrency_impact": concurrency_impact
},
"insights": {
"recommendations": [
{
"issue": rec["recommendation"].issue,
"recommendation": rec["recommendation"].recommendation,
"expected_impact": rec["recommendation"].expected_impact,
"implementation_difficulty": rec["recommendation"].implementation_difficulty,
"priority_level": rec["priority_level"]
}
for rec in prioritized_recommendations
],
"scaling_insights": [
{
"topic": insight.topic,
"description": insight.description
}
for insight in scaling_insights
]
}
}
return results
def _create_error_analysis(self, test_results: TestResults) -> Dict:
"""Create error analysis from test results.
Args:
test_results: TestResults object
Returns:
Dictionary containing error analysis
"""
# Extract error samples
error_samples = [sample for sample in test_results.samples if not sample.success]
if not error_samples:
return {"error_types": {}, "error_patterns": []}
# Count error types
error_types = {}
for sample in error_samples:
error_message = sample.error_message or f"HTTP {sample.response_code}"
if error_message in error_types:
error_types[error_message] += 1
else:
error_types[error_message] = 1
# Detect error patterns
error_patterns = []
# Check for error spikes
if test_results.start_time and test_results.end_time:
# Group errors by time intervals (5-second intervals)
interval_seconds = 5
duration = (test_results.end_time - test_results.start_time).total_seconds()
num_intervals = int(duration / interval_seconds) + 1
# Count errors in each interval
interval_errors = [0] * num_intervals
for sample in error_samples:
interval_index = int((sample.timestamp - test_results.start_time).total_seconds() / interval_seconds)
if 0 <= interval_index < num_intervals:
interval_errors[interval_index] += 1
# Calculate average errors per interval
avg_errors = sum(interval_errors) / len(interval_errors) if interval_errors else 0
# Detect spikes (intervals with errors > 2 * average)
for i, error_count in enumerate(interval_errors):
if error_count > 2 * avg_errors and error_count > 1:
spike_time = test_results.start_time + timedelta(seconds=i * interval_seconds)
error_patterns.append({
"type": "spike",
"timestamp": spike_time.isoformat(),
"error_count": error_count
})
return {
"error_types": error_types,
"error_patterns": error_patterns
}
```
--------------------------------------------------------------------------------
/analyzer/insights/generator.py:
--------------------------------------------------------------------------------
```python
"""
Insights generator for JMeter test results.
This module provides functionality for generating insights and recommendations
based on JMeter test results analysis, including bottleneck recommendations,
error pattern analysis, scaling insights, and recommendation prioritization.
"""
from typing import Dict, List, Optional, Tuple, Union
from analyzer.models import (Bottleneck, EndpointMetrics, Insight,
OverallMetrics, Recommendation, TestResults)
class InsightsGenerator:
"""Generator for insights and recommendations based on test results analysis."""
def generate_bottleneck_recommendations(self, bottlenecks: List[Bottleneck]) -> List[Recommendation]:
"""Generate recommendations for addressing identified bottlenecks.
Args:
bottlenecks: List of Bottleneck objects
Returns:
List of Recommendation objects
"""
recommendations = []
# Process response time bottlenecks
response_time_bottlenecks = [b for b in bottlenecks if b.metric_type == "response_time"]
if response_time_bottlenecks:
# Group by severity
high_severity = [b for b in response_time_bottlenecks if b.severity == "high"]
medium_severity = [b for b in response_time_bottlenecks if b.severity == "medium"]
# Generate recommendations for high severity bottlenecks
if high_severity:
endpoints = ", ".join(b.endpoint for b in high_severity[:3])
recommendation = Recommendation(
issue=f"Critical response time issues in endpoints: {endpoints}",
recommendation="Optimize database queries, add caching, or consider asynchronous processing for these endpoints",
expected_impact="Significant reduction in response times and improved user experience",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
# Generate recommendations for medium severity bottlenecks
if medium_severity:
endpoints = ", ".join(b.endpoint for b in medium_severity[:3])
recommendation = Recommendation(
issue=f"Moderate response time issues in endpoints: {endpoints}",
recommendation="Profile the code to identify bottlenecks and optimize the most expensive operations",
expected_impact="Moderate improvement in response times",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
# Process error rate bottlenecks
error_rate_bottlenecks = [b for b in bottlenecks if b.metric_type == "error_rate"]
if error_rate_bottlenecks:
# Group by severity
high_severity = [b for b in error_rate_bottlenecks if b.severity == "high"]
medium_severity = [b for b in error_rate_bottlenecks if b.severity == "medium"]
# Generate recommendations for high severity bottlenecks
if high_severity:
endpoints = ", ".join(b.endpoint for b in high_severity[:3])
recommendation = Recommendation(
issue=f"High error rates in endpoints: {endpoints}",
recommendation="Investigate error logs, add proper error handling, and fix the root causes of errors",
expected_impact="Significant reduction in error rates and improved reliability",
implementation_difficulty="high"
)
recommendations.append(recommendation)
# Generate recommendations for medium severity bottlenecks
if medium_severity:
endpoints = ", ".join(b.endpoint for b in medium_severity[:3])
recommendation = Recommendation(
issue=f"Moderate error rates in endpoints: {endpoints}",
recommendation="Review error handling and add appropriate validation and error recovery mechanisms",
expected_impact="Moderate reduction in error rates",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
return recommendations
def generate_error_recommendations(self, error_analysis: Dict) -> List[Recommendation]:
"""Generate recommendations for addressing error patterns.
Args:
error_analysis: Dictionary containing error analysis results
Returns:
List of Recommendation objects
"""
recommendations = []
# Process error types
error_types = error_analysis.get("error_types", {})
if error_types:
# Find the most common error types
sorted_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)
top_errors = sorted_errors[:3]
for error_type, count in top_errors:
if "timeout" in error_type.lower():
recommendation = Recommendation(
issue=f"Timeout errors ({count} occurrences)",
recommendation="Increase timeout thresholds, optimize slow operations, or implement circuit breakers",
expected_impact="Reduction in timeout errors and improved reliability",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
elif "connection" in error_type.lower():
recommendation = Recommendation(
issue=f"Connection errors ({count} occurrences)",
recommendation="Implement connection pooling, retry mechanisms, or check network configuration",
expected_impact="Improved connection stability and reduced errors",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
elif "500" in error_type or "server" in error_type.lower():
recommendation = Recommendation(
issue=f"Server errors ({count} occurrences)",
recommendation="Check server logs, fix application bugs, and add proper error handling",
expected_impact="Reduction in server errors and improved reliability",
implementation_difficulty="high"
)
recommendations.append(recommendation)
elif "400" in error_type or "client" in error_type.lower():
recommendation = Recommendation(
issue=f"Client errors ({count} occurrences)",
recommendation="Validate input data, fix client-side issues, and improve error messages",
expected_impact="Reduction in client errors and improved user experience",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
else:
recommendation = Recommendation(
issue=f"{error_type} errors ({count} occurrences)",
recommendation="Investigate the root cause and implement appropriate error handling",
expected_impact="Reduction in errors and improved reliability",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
# Process error patterns
error_patterns = error_analysis.get("error_patterns", [])
if error_patterns:
for pattern in error_patterns:
pattern_type = pattern.get("type", "")
if pattern_type == "spike":
recommendation = Recommendation(
issue="Error spike detected during the test",
recommendation="Investigate what happened during the spike period and address the underlying cause",
expected_impact="Prevention of error spikes in production",
implementation_difficulty="medium"
)
recommendations.append(recommendation)
elif pattern_type == "increasing":
recommendation = Recommendation(
issue="Increasing error rate over time",
recommendation="Check for resource leaks, memory issues, or degrading performance under load",
expected_impact="Stable error rates during extended usage",
implementation_difficulty="high"
)
recommendations.append(recommendation)
return recommendations
def generate_scaling_insights(self, concurrency_analysis: Dict) -> List[Insight]:
"""Generate insights on scaling behavior and capacity limits.
Args:
concurrency_analysis: Dictionary containing concurrency analysis results
Returns:
List of Insight objects
"""
insights = []
correlation = concurrency_analysis.get("correlation", 0)
has_degradation = concurrency_analysis.get("has_degradation", False)
degradation_threshold = concurrency_analysis.get("degradation_threshold", 0)
# Generate insights based on correlation
if correlation > 0.8:
insight = Insight(
topic="Strong Correlation with Concurrency",
description="There is a strong correlation between the number of concurrent users and response times, indicating potential scalability issues",
supporting_data={"correlation": correlation}
)
insights.append(insight)
elif correlation > 0.5:
insight = Insight(
topic="Moderate Correlation with Concurrency",
description="There is a moderate correlation between the number of concurrent users and response times, suggesting some scalability concerns",
supporting_data={"correlation": correlation}
)
insights.append(insight)
elif correlation < 0.2 and correlation > -0.2:
insight = Insight(
topic="No Correlation with Concurrency",
description="There is little to no correlation between the number of concurrent users and response times, suggesting good scalability",
supporting_data={"correlation": correlation}
)
insights.append(insight)
# Generate insights based on degradation threshold
if has_degradation:
insight = Insight(
topic="Performance Degradation Threshold",
description=f"Performance begins to degrade significantly at {degradation_threshold} concurrent users, indicating a potential capacity limit",
supporting_data={"degradation_threshold": degradation_threshold}
)
insights.append(insight)
# Add recommendation for addressing the degradation
if degradation_threshold > 0:
insight = Insight(
topic="Scaling Recommendation",
description=f"Consider horizontal scaling or optimization before reaching {degradation_threshold} concurrent users to maintain performance",
supporting_data={"degradation_threshold": degradation_threshold}
)
insights.append(insight)
else:
insight = Insight(
topic="No Performance Degradation Detected",
description="No significant performance degradation was detected with increasing concurrent users within the tested range",
supporting_data={}
)
insights.append(insight)
return insights
def prioritize_recommendations(self, recommendations: List[Recommendation]) -> List[Dict]:
"""Prioritize recommendations based on potential impact.
Args:
recommendations: List of Recommendation objects
Returns:
List of dictionaries containing prioritized recommendations
"""
if not recommendations:
return []
# Define scoring system
severity_scores = {
"high": 3,
"medium": 2,
"low": 1
}
difficulty_scores = {
"low": 3,
"medium": 2,
"high": 1
}
# Calculate priority score for each recommendation
prioritized = []
for recommendation in recommendations:
# Extract severity from the issue (if available)
severity = "medium" # Default
if "critical" in recommendation.issue.lower():
severity = "high"
elif "moderate" in recommendation.issue.lower():
severity = "medium"
elif "minor" in recommendation.issue.lower():
severity = "low"
# Get difficulty
difficulty = recommendation.implementation_difficulty
# Calculate priority score (higher is more important)
severity_score = severity_scores.get(severity, 2)
difficulty_score = difficulty_scores.get(difficulty, 2)
# Priority formula: severity * 2 + ease of implementation
# This weights severity more heavily than implementation difficulty
priority_score = severity_score * 2 + difficulty_score
prioritized.append({
"recommendation": recommendation,
"priority_score": priority_score,
"priority_level": self._get_priority_level(priority_score)
})
# Sort by priority score (descending)
prioritized.sort(key=lambda x: x["priority_score"], reverse=True)
return prioritized
def _get_priority_level(self, score: int) -> str:
"""Convert a priority score to a priority level.
Args:
score: Priority score
Returns:
Priority level string
"""
if score >= 7:
return "critical"
elif score >= 5:
return "high"
elif score >= 3:
return "medium"
else:
return "low"
```
--------------------------------------------------------------------------------
/analyzer/visualization/engine.py:
--------------------------------------------------------------------------------
```python
"""
Visualization engine for JMeter test results.
This module provides functionality for creating visual representations
of JMeter test results analysis, including time series graphs, distribution
graphs, endpoint comparison charts, and visualization output formats.
"""
import base64
import io
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# Note: In a real implementation, we would use matplotlib for visualization.
# However, for the purpose of this implementation, we'll create a simplified version
# that doesn't rely on external libraries.
class VisualizationEngine:
"""Engine for creating visual representations of test results analysis."""
def __init__(self, output_dir: Optional[str] = None):
"""Initialize the visualization engine.
Args:
output_dir: Directory to save visualization files (default: None)
"""
self.output_dir = output_dir
if output_dir:
os.makedirs(output_dir, exist_ok=True)
def create_time_series_graph(self, time_series_metrics: List,
metric_name: str = "average_response_time",
title: Optional[str] = None,
output_file: Optional[str] = None) -> Union[str, Dict]:
"""Create a time-series graph showing performance over the test duration.
Args:
time_series_metrics: List of TimeSeriesMetrics objects
metric_name: Name of the metric to plot (default: "average_response_time")
title: Graph title (default: None)
output_file: Path to save the graph (default: None)
Returns:
Path to the saved graph file or Figure object
"""
if not time_series_metrics:
raise ValueError("No time series metrics provided")
# Extract data
timestamps = [metrics.timestamp for metrics in time_series_metrics]
if metric_name == "average_response_time":
values = [metrics.average_response_time for metrics in time_series_metrics]
y_label = "Response Time (ms)"
graph_title = title or "Response Time Over Time"
elif metric_name == "throughput":
values = [metrics.throughput for metrics in time_series_metrics]
y_label = "Throughput (requests/second)"
graph_title = title or "Throughput Over Time"
elif metric_name == "error_rate":
values = [metrics.error_rate for metrics in time_series_metrics]
y_label = "Error Rate (%)"
graph_title = title or "Error Rate Over Time"
elif metric_name == "active_threads":
values = [metrics.active_threads for metrics in time_series_metrics]
y_label = "Active Threads"
graph_title = title or "Active Threads Over Time"
else:
raise ValueError(f"Unknown metric name: {metric_name}")
# Create a simple representation of the graph
graph = {
"type": "time_series",
"title": graph_title,
"x_label": "Time",
"y_label": y_label,
"timestamps": [ts.isoformat() for ts in timestamps],
"values": values
}
# Save or return
if output_file:
output_path = self._get_output_path(output_file)
with open(output_path, 'w') as f:
f.write(f"Time Series Graph: {graph_title}\n")
f.write(f"X-axis: Time\n")
f.write(f"Y-axis: {y_label}\n")
f.write("Data:\n")
for ts, val in zip(timestamps, values):
f.write(f"{ts.isoformat()}: {val}\n")
return output_path
else:
return graph
def create_distribution_graph(self, response_times: List[float],
percentiles: List[int] = [50, 90, 95, 99],
title: Optional[str] = None,
output_file: Optional[str] = None) -> Union[str, Dict]:
"""Create a distribution graph showing response time distributions.
Args:
response_times: List of response times
percentiles: List of percentiles to mark (default: [50, 90, 95, 99])
title: Graph title (default: None)
output_file: Path to save the graph (default: None)
Returns:
Path to the saved graph file or Figure object
"""
if not response_times:
raise ValueError("No response times provided")
# Calculate percentile values
percentile_values = {}
for p in percentiles:
percentile_values[p] = self._calculate_percentile(response_times, p)
# Create a simple representation of the graph
graph_title = title or "Response Time Distribution"
graph = {
"type": "distribution",
"title": graph_title,
"x_label": "Response Time (ms)",
"y_label": "Frequency",
"response_times": response_times,
"percentiles": percentile_values
}
# Save or return
if output_file:
output_path = self._get_output_path(output_file)
with open(output_path, 'w') as f:
f.write(f"Distribution Graph: {graph_title}\n")
f.write(f"X-axis: Response Time (ms)\n")
f.write(f"Y-axis: Frequency\n")
f.write("Percentiles:\n")
for p, val in percentile_values.items():
f.write(f"{p}th Percentile: {val:.2f} ms\n")
return output_path
else:
return graph
def create_endpoint_comparison_chart(self, endpoint_metrics: Dict,
metric_name: str = "average_response_time",
top_n: int = 10,
title: Optional[str] = None,
output_file: Optional[str] = None) -> Union[str, Dict]:
"""Create a comparison chart for different endpoints.
Args:
endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects
metric_name: Name of the metric to compare (default: "average_response_time")
top_n: Number of top endpoints to include (default: 10)
title: Chart title (default: None)
output_file: Path to save the chart (default: None)
Returns:
Path to the saved chart file or Figure object
"""
if not endpoint_metrics:
raise ValueError("No endpoint metrics provided")
# Extract data
if metric_name == "average_response_time":
values = {endpoint: metrics.average_response_time for endpoint, metrics in endpoint_metrics.items()}
y_label = "Average Response Time (ms)"
chart_title = title or "Endpoint Response Time Comparison"
elif metric_name == "error_rate":
values = {endpoint: metrics.error_rate for endpoint, metrics in endpoint_metrics.items()}
y_label = "Error Rate (%)"
chart_title = title or "Endpoint Error Rate Comparison"
elif metric_name == "throughput":
values = {endpoint: metrics.throughput for endpoint, metrics in endpoint_metrics.items()}
y_label = "Throughput (requests/second)"
chart_title = title or "Endpoint Throughput Comparison"
else:
raise ValueError(f"Unknown metric name: {metric_name}")
# Sort endpoints by value (descending) and take top N
sorted_endpoints = sorted(values.items(), key=lambda x: x[1], reverse=True)[:top_n]
endpoints = [item[0] for item in sorted_endpoints]
values_list = [item[1] for item in sorted_endpoints]
# Create a simple representation of the chart
chart = {
"type": "endpoint_comparison",
"title": chart_title,
"x_label": y_label,
"y_label": "Endpoint",
"endpoints": endpoints,
"values": values_list
}
# Save or return
if output_file:
output_path = self._get_output_path(output_file)
with open(output_path, 'w') as f:
f.write(f"Endpoint Comparison Chart: {chart_title}\n")
f.write(f"X-axis: {y_label}\n")
f.write(f"Y-axis: Endpoint\n")
f.write("Data:\n")
for endpoint, value in zip(endpoints, values_list):
f.write(f"{endpoint}: {value:.2f}\n")
return output_path
else:
return chart
def create_html_report(self, analysis_results: Dict, output_file: str) -> str:
"""Create an HTML report from analysis results.
Args:
analysis_results: Dictionary containing analysis results
output_file: Path to save the HTML report
Returns:
Path to the saved HTML report
"""
# Extract data
summary = analysis_results.get("summary", {})
detailed = analysis_results.get("detailed", {})
# Create HTML content
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>JMeter Test Results Analysis</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1, h2, h3 {{ color: #333; }}
table {{ border-collapse: collapse; width: 100%; margin-bottom: 20px; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
tr:nth-child(even) {{ background-color: #f9f9f9; }}
.chart {{ margin: 20px 0; max-width: 100%; }}
.section {{ margin-bottom: 30px; }}
.severity-high {{ color: #d9534f; }}
.severity-medium {{ color: #f0ad4e; }}
.severity-low {{ color: #5bc0de; }}
</style>
</head>
<body>
<h1>JMeter Test Results Analysis</h1>
<div class="section">
<h2>Summary</h2>
<table>
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Total Samples</td><td>{summary.get('total_samples', 'N/A')}</td></tr>
<tr><td>Error Count</td><td>{summary.get('error_count', 'N/A')}</td></tr>
<tr><td>Error Rate</td><td>{summary.get('error_rate', 'N/A'):.2f}%</td></tr>
<tr><td>Average Response Time</td><td>{summary.get('average_response_time', 'N/A'):.2f} ms</td></tr>
<tr><td>Median Response Time</td><td>{summary.get('median_response_time', 'N/A'):.2f} ms</td></tr>
<tr><td>90th Percentile</td><td>{summary.get('percentile_90', 'N/A'):.2f} ms</td></tr>
<tr><td>95th Percentile</td><td>{summary.get('percentile_95', 'N/A'):.2f} ms</td></tr>
<tr><td>99th Percentile</td><td>{summary.get('percentile_99', 'N/A'):.2f} ms</td></tr>
<tr><td>Min Response Time</td><td>{summary.get('min_response_time', 'N/A'):.2f} ms</td></tr>
<tr><td>Max Response Time</td><td>{summary.get('max_response_time', 'N/A'):.2f} ms</td></tr>
<tr><td>Throughput</td><td>{summary.get('throughput', 'N/A'):.2f} requests/second</td></tr>
<tr><td>Start Time</td><td>{summary.get('start_time', 'N/A')}</td></tr>
<tr><td>End Time</td><td>{summary.get('end_time', 'N/A')}</td></tr>
<tr><td>Duration</td><td>{summary.get('duration', 'N/A'):.2f} seconds</td></tr>
</table>
</div>
"""
# Add detailed information if available
if detailed:
# Add endpoint information
endpoints = detailed.get("endpoints", {})
if endpoints:
html_content += """
<div class="section">
<h2>Endpoint Analysis</h2>
<table>
<tr>
<th>Endpoint</th>
<th>Samples</th>
<th>Errors</th>
<th>Error Rate</th>
<th>Avg Response Time</th>
<th>95th Percentile</th>
<th>Throughput</th>
</tr>
"""
for endpoint, metrics in endpoints.items():
html_content += f"""
<tr>
<td>{endpoint}</td>
<td>{metrics['total_samples']}</td>
<td>{metrics['error_count']}</td>
<td>{metrics['error_rate']:.2f}%</td>
<td>{metrics['average_response_time']:.2f} ms</td>
<td>{metrics['percentile_95']:.2f} ms</td>
<td>{metrics['throughput']:.2f} req/s</td>
</tr>
"""
html_content += """
</table>
</div>
"""
# Add bottleneck information
bottlenecks = detailed.get("bottlenecks", {})
if bottlenecks:
html_content += """
<div class="section">
<h2>Bottleneck Analysis</h2>
"""
# Slow endpoints
slow_endpoints = bottlenecks.get("slow_endpoints", [])
if slow_endpoints:
html_content += """
<h3>Slow Endpoints</h3>
<table>
<tr>
<th>Endpoint</th>
<th>Response Time</th>
<th>Threshold</th>
<th>Severity</th>
</tr>
"""
for endpoint in slow_endpoints:
severity_class = f"severity-{endpoint.get('severity', 'medium')}"
html_content += f"""
<tr>
<td>{endpoint.get('endpoint')}</td>
<td>{endpoint.get('response_time', 'N/A'):.2f} ms</td>
<td>{endpoint.get('threshold', 'N/A'):.2f} ms</td>
<td class="{severity_class}">{endpoint.get('severity', 'N/A').upper()}</td>
</tr>
"""
html_content += """
</table>
"""
# Error-prone endpoints
error_endpoints = bottlenecks.get("error_prone_endpoints", [])
if error_endpoints:
html_content += """
<h3>Error-Prone Endpoints</h3>
<table>
<tr>
<th>Endpoint</th>
<th>Error Rate</th>
<th>Threshold</th>
<th>Severity</th>
</tr>
"""
for endpoint in error_endpoints:
severity_class = f"severity-{endpoint.get('severity', 'medium')}"
html_content += f"""
<tr>
<td>{endpoint.get('endpoint')}</td>
<td>{endpoint.get('error_rate', 'N/A'):.2f}%</td>
<td>{endpoint.get('threshold', 'N/A'):.2f}%</td>
<td class="{severity_class}">{endpoint.get('severity', 'N/A').upper()}</td>
</tr>
"""
html_content += """
</table>
"""
html_content += """
</div>
"""
# Add insights and recommendations
insights = detailed.get("insights", {})
if insights:
html_content += """
<div class="section">
<h2>Insights and Recommendations</h2>
"""
# Recommendations
recommendations = insights.get("recommendations", [])
if recommendations:
html_content += """
<h3>Recommendations</h3>
<table>
<tr>
<th>Priority</th>
<th>Issue</th>
<th>Recommendation</th>
<th>Expected Impact</th>
</tr>
"""
for rec in recommendations:
priority_level = rec.get('priority_level', 'medium')
severity_class = f"severity-{priority_level}"
html_content += f"""
<tr>
<td class="{severity_class}">{priority_level.upper()}</td>
<td>{rec.get('issue')}</td>
<td>{rec.get('recommendation')}</td>
<td>{rec.get('expected_impact')}</td>
</tr>
"""
html_content += """
</table>
"""
# Scaling insights
scaling_insights = insights.get("scaling_insights", [])
if scaling_insights:
html_content += """
<h3>Scaling Insights</h3>
<table>
<tr>
<th>Topic</th>
<th>Description</th>
</tr>
"""
for insight in scaling_insights:
html_content += f"""
<tr>
<td>{insight.get('topic')}</td>
<td>{insight.get('description')}</td>
</tr>
"""
html_content += """
</table>
"""
html_content += """
</div>
"""
# Close HTML
html_content += """
</body>
</html>
"""
# Save HTML report
output_path = self._get_output_path(output_file)
with open(output_path, 'w') as f:
f.write(html_content)
return output_path
def figure_to_base64(self, fig) -> str:
"""Convert a figure to a base64-encoded string.
Args:
fig: Figure object
Returns:
Base64-encoded string
"""
# In a real implementation, this would convert a matplotlib figure to base64
# For this simplified version, we'll just return a placeholder
return "base64_encoded_image_placeholder"
def _get_output_path(self, output_file: str) -> str:
"""Get the full path for an output file.
Args:
output_file: Output file name or path
Returns:
Full path to the output file
"""
if self.output_dir:
return os.path.join(self.output_dir, output_file)
else:
return output_file
def _calculate_percentile(self, values: List[float], percentile: float) -> float:
"""Calculate a percentile from values.
Args:
values: List of values
percentile: Percentile to calculate (0-100)
Returns:
Percentile value
"""
if not values:
return 0
# Sort values
sorted_values = sorted(values)
# Calculate index
index = (percentile / 100) * (len(sorted_values) - 1)
# If index is an integer, return the value at that index
if index.is_integer():
return sorted_values[int(index)]
# Otherwise, interpolate between the two nearest values
lower_index = int(index)
upper_index = lower_index + 1
lower_value = sorted_values[lower_index]
upper_value = sorted_values[upper_index]
fraction = index - lower_index
return lower_value + (upper_value - lower_value) * fraction
```
--------------------------------------------------------------------------------
/jmeter_server.py:
--------------------------------------------------------------------------------
```python
from typing import Any
import subprocess
from pathlib import Path
from mcp.server.fastmcp import FastMCP
import os
import datetime
import uuid
import logging
import logging
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize MCP server
mcp = FastMCP("jmeter")
async def run_jmeter(test_file: str, non_gui: bool = True, properties: dict = None, generate_report: bool = False, report_output_dir: str = None, log_file: str = None) -> str:
"""Run a JMeter test.
Args:
test_file: Path to the JMeter test file (.jmx)
non_gui: Run in non-GUI mode (default: True)
properties: Dictionary of JMeter properties to pass with -J (default: None)
generate_report: Whether to generate report dashboard after load test (default: False)
report_output_dir: Output folder for report dashboard (default: None)
log_file: Name of JTL file to log sample results to (default: None)
Returns:
str: JMeter execution output
"""
try:
# Convert to absolute path
test_file_path = Path(test_file).resolve()
# Validate file exists and is a .jmx file
if not test_file_path.exists():
return f"Error: Test file not found: {test_file}"
if not test_file_path.suffix == '.jmx':
return f"Error: Invalid file type. Expected .jmx file: {test_file}"
# Get JMeter binary path from environment
jmeter_bin = os.getenv('JMETER_BIN', 'jmeter')
java_opts = os.getenv('JMETER_JAVA_OPTS', '')
# Log the JMeter binary path and Java options
logger.info(f"JMeter binary path: {jmeter_bin}")
logger.debug(f"Java options: {java_opts}")
# Build command
cmd = [str(Path(jmeter_bin).resolve())]
if non_gui:
cmd.extend(['-n'])
cmd.extend(['-t', str(test_file_path)])
# Add JMeter properties if provided∑
if properties:
for prop_name, prop_value in properties.items():
cmd.extend([f'-J{prop_name}={prop_value}'])
logger.debug(f"Adding property: -J{prop_name}={prop_value}")
# Add report generation options if requested
if generate_report and non_gui:
if log_file is None:
# Generate unique log file name if not specified
unique_id = generate_unique_id()
log_file = f"{test_file_path.stem}_{unique_id}_results.jtl"
logger.debug(f"Using generated unique log file: {log_file}")
cmd.extend(['-l', log_file])
cmd.extend(['-e'])
# Always ensure report_output_dir is unique
unique_id = unique_id if 'unique_id' in locals() else generate_unique_id()
if report_output_dir:
# Append unique identifier to user-provided report directory
original_dir = report_output_dir
report_output_dir = f"{original_dir}_{unique_id}"
logger.debug(f"Making user-provided report directory unique: {original_dir} -> {report_output_dir}")
else:
# Generate unique report output directory if not specified
report_output_dir = f"{test_file_path.stem}_{unique_id}_report"
logger.debug(f"Using generated unique report output directory: {report_output_dir}")
cmd.extend(['-o', report_output_dir])
# Log the full command for debugging
logger.debug(f"Executing command: {' '.join(cmd)}")
if non_gui:
# For non-GUI mode, capture output
result = subprocess.run(cmd, capture_output=True, text=True)
# Log output for debugging
logger.debug("Command output:")
logger.debug(f"Return code: {result.returncode}")
logger.debug(f"Stdout: {result.stdout}")
logger.debug(f"Stderr: {result.stderr}")
if result.returncode != 0:
return f"Error executing JMeter test:\n{result.stderr}"
return result.stdout
else:
# For GUI mode, start process without capturing output
subprocess.Popen(cmd)
return "JMeter GUI launched successfully"
except Exception as e:
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def execute_jmeter_test(test_file: str, gui_mode: bool = False, properties: dict = None) -> str:
"""Execute a JMeter test.
Args:
test_file: Path to the JMeter test file (.jmx)
gui_mode: Whether to run in GUI mode (default: False)
properties: Dictionary of JMeter properties to pass with -J (default: None)
"""
return await run_jmeter(test_file, non_gui=not gui_mode, properties=properties) # Run in non-GUI mode by default
@mcp.tool()
async def execute_jmeter_test_non_gui(test_file: str, properties: dict = None, generate_report: bool = False, report_output_dir: str = None, log_file: str = None) -> str:
"""Execute a JMeter test in non-GUI mode - supports JMeter properties.
Args:
test_file: Path to the JMeter test file (.jmx)
properties: Dictionary of JMeter properties to pass with -J (default: None)
generate_report: Whether to generate report dashboard after load test (default: False)
report_output_dir: Output folder for report dashboard (default: None)
log_file: Name of JTL file to log sample results to (default: None)
"""
return await run_jmeter(test_file, non_gui=True, properties=properties, generate_report=generate_report, report_output_dir=report_output_dir, log_file=log_file)
# Import the analyzer module
from analyzer.models import TestResults
from analyzer.analyzer import TestResultsAnalyzer
from analyzer.visualization.engine import VisualizationEngine
@mcp.tool()
async def analyze_jmeter_results(jtl_file: str, detailed: bool = False) -> str:
"""Analyze JMeter test results and provide a summary of key metrics and insights.
Args:
jtl_file: Path to the JTL file containing test results
detailed: Whether to include detailed analysis (default: False)
Returns:
str: Analysis results in a formatted string
"""
try:
analyzer = TestResultsAnalyzer()
# Validate file exists
file_path = Path(jtl_file)
if not file_path.exists():
return f"Error: JTL file not found: {jtl_file}"
try:
# Analyze the file
analysis_results = analyzer.analyze_file(file_path, detailed=detailed)
# Format the results as a string
result_str = f"Analysis of {jtl_file}:\n\n"
# Add summary information
summary = analysis_results.get("summary", {})
result_str += "Summary:\n"
result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n"
result_str += f"- Error count: {summary.get('error_count', 'N/A')} ({summary.get('error_rate', 'N/A'):.2f}%)\n"
result_str += f"- Response times (ms):\n"
result_str += f" - Average: {summary.get('average_response_time', 'N/A'):.2f}\n"
result_str += f" - Median: {summary.get('median_response_time', 'N/A'):.2f}\n"
result_str += f" - 90th percentile: {summary.get('percentile_90', 'N/A'):.2f}\n"
result_str += f" - 95th percentile: {summary.get('percentile_95', 'N/A'):.2f}\n"
result_str += f" - 99th percentile: {summary.get('percentile_99', 'N/A'):.2f}\n"
result_str += f" - Min: {summary.get('min_response_time', 'N/A'):.2f}\n"
result_str += f" - Max: {summary.get('max_response_time', 'N/A'):.2f}\n"
result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n"
result_str += f"- Start time: {summary.get('start_time', 'N/A')}\n"
result_str += f"- End time: {summary.get('end_time', 'N/A')}\n"
result_str += f"- Duration: {summary.get('duration', 'N/A'):.2f} seconds\n\n"
# Add detailed information if requested
if detailed and "detailed" in analysis_results:
detailed_info = analysis_results["detailed"]
# Add endpoint information
endpoints = detailed_info.get("endpoints", {})
if endpoints:
result_str += "Endpoint Analysis:\n"
for endpoint, metrics in endpoints.items():
result_str += f"- {endpoint}:\n"
result_str += f" - Samples: {metrics.get('total_samples', 'N/A')}\n"
result_str += f" - Errors: {metrics.get('error_count', 'N/A')} ({metrics.get('error_rate', 'N/A'):.2f}%)\n"
result_str += f" - Average response time: {metrics.get('average_response_time', 'N/A'):.2f} ms\n"
result_str += f" - 95th percentile: {metrics.get('percentile_95', 'N/A'):.2f} ms\n"
result_str += f" - Throughput: {metrics.get('throughput', 'N/A'):.2f} requests/second\n"
result_str += "\n"
# Add bottleneck information
bottlenecks = detailed_info.get("bottlenecks", {})
if bottlenecks:
result_str += "Bottleneck Analysis:\n"
# Slow endpoints
slow_endpoints = bottlenecks.get("slow_endpoints", [])
if slow_endpoints:
result_str += "- Slow Endpoints:\n"
for endpoint in slow_endpoints:
result_str += f" - {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms "
result_str += f"(Severity: {endpoint.get('severity')})\n"
result_str += "\n"
# Error-prone endpoints
error_endpoints = bottlenecks.get("error_prone_endpoints", [])
if error_endpoints:
result_str += "- Error-Prone Endpoints:\n"
for endpoint in error_endpoints:
result_str += f" - {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% "
result_str += f"(Severity: {endpoint.get('severity')})\n"
result_str += "\n"
# Anomalies
anomalies = bottlenecks.get("anomalies", [])
if anomalies:
result_str += "- Response Time Anomalies:\n"
for anomaly in anomalies[:3]: # Show only top 3 anomalies
result_str += f" - At {anomaly.get('timestamp')}: "
result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, "
result_str += f"Got {anomaly.get('actual_value'):.2f} ms "
result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n"
result_str += "\n"
# Concurrency impact
concurrency = bottlenecks.get("concurrency_impact", {})
if concurrency:
result_str += "- Concurrency Impact:\n"
correlation = concurrency.get("correlation", 0)
result_str += f" - Correlation between threads and response time: {correlation:.2f}\n"
if concurrency.get("has_degradation", False):
result_str += f" - Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n"
else:
result_str += " - No significant performance degradation detected with increasing threads\n"
result_str += "\n"
# Add insights and recommendations
insights = detailed_info.get("insights", {})
if insights:
result_str += "Insights and Recommendations:\n"
# Recommendations
recommendations = insights.get("recommendations", [])
if recommendations:
result_str += "- Top Recommendations:\n"
for rec in recommendations[:3]: # Show only top 3 recommendations
result_str += f" - [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n"
result_str += f" Recommendation: {rec.get('recommendation')}\n"
result_str += f" Expected Impact: {rec.get('expected_impact')}\n"
result_str += "\n"
# Scaling insights
scaling_insights = insights.get("scaling_insights", [])
if scaling_insights:
result_str += "- Scaling Insights:\n"
for insight in scaling_insights[:2]: # Show only top 2 insights
result_str += f" - {insight.get('topic')}: {insight.get('description')}\n"
result_str += "\n"
# Add time series information (just a summary)
time_series = detailed_info.get("time_series", [])
if time_series:
result_str += "Time Series Analysis:\n"
result_str += f"- Intervals: {len(time_series)}\n"
result_str += f"- Interval duration: 5 seconds\n"
# Calculate average throughput and response time over intervals
avg_throughput = sum(ts.get('throughput', 0) for ts in time_series) / len(time_series)
avg_response_time = sum(ts.get('average_response_time', 0) for ts in time_series) / len(time_series)
result_str += f"- Average throughput over intervals: {avg_throughput:.2f} requests/second\n"
result_str += f"- Average response time over intervals: {avg_response_time:.2f} ms\n\n"
return result_str
except ValueError as e:
return f"Error analyzing JTL file: {str(e)}"
except Exception as e:
return f"Error analyzing JMeter results: {str(e)}"
@mcp.tool()
async def identify_performance_bottlenecks(jtl_file: str) -> str:
"""Identify performance bottlenecks in JMeter test results.
Args:
jtl_file: Path to the JTL file containing test results
Returns:
str: Bottleneck analysis results in a formatted string
"""
try:
analyzer = TestResultsAnalyzer()
# Validate file exists
file_path = Path(jtl_file)
if not file_path.exists():
return f"Error: JTL file not found: {jtl_file}"
try:
# Analyze the file with detailed analysis
analysis_results = analyzer.analyze_file(file_path, detailed=True)
# Format the results as a string
result_str = f"Performance Bottleneck Analysis of {jtl_file}:\n\n"
# Add bottleneck information
detailed_info = analysis_results.get("detailed", {})
bottlenecks = detailed_info.get("bottlenecks", {})
if not bottlenecks:
return f"No bottlenecks identified in {jtl_file}."
# Slow endpoints
slow_endpoints = bottlenecks.get("slow_endpoints", [])
if slow_endpoints:
result_str += "Slow Endpoints:\n"
for endpoint in slow_endpoints:
result_str += f"- {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms "
result_str += f"(Severity: {endpoint.get('severity')})\n"
result_str += "\n"
else:
result_str += "No slow endpoints identified.\n\n"
# Error-prone endpoints
error_endpoints = bottlenecks.get("error_prone_endpoints", [])
if error_endpoints:
result_str += "Error-Prone Endpoints:\n"
for endpoint in error_endpoints:
result_str += f"- {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% "
result_str += f"(Severity: {endpoint.get('severity')})\n"
result_str += "\n"
else:
result_str += "No error-prone endpoints identified.\n\n"
# Anomalies
anomalies = bottlenecks.get("anomalies", [])
if anomalies:
result_str += "Response Time Anomalies:\n"
for anomaly in anomalies:
result_str += f"- At {anomaly.get('timestamp')}: "
result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, "
result_str += f"Got {anomaly.get('actual_value'):.2f} ms "
result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n"
result_str += "\n"
else:
result_str += "No response time anomalies detected.\n\n"
# Concurrency impact
concurrency = bottlenecks.get("concurrency_impact", {})
if concurrency:
result_str += "Concurrency Impact:\n"
correlation = concurrency.get("correlation", 0)
result_str += f"- Correlation between threads and response time: {correlation:.2f}\n"
if concurrency.get("has_degradation", False):
result_str += f"- Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n"
else:
result_str += "- No significant performance degradation detected with increasing threads\n"
result_str += "\n"
# Add recommendations
insights = detailed_info.get("insights", {})
recommendations = insights.get("recommendations", [])
if recommendations:
result_str += "Recommendations:\n"
for rec in recommendations[:5]: # Show top 5 recommendations
result_str += f"- [{rec.get('priority_level', 'medium').upper()}] {rec.get('recommendation')}\n"
else:
result_str += "No specific recommendations available.\n"
return result_str
except ValueError as e:
return f"Error analyzing JTL file: {str(e)}"
except Exception as e:
return f"Error identifying performance bottlenecks: {str(e)}"
@mcp.tool()
async def get_performance_insights(jtl_file: str) -> str:
"""Get insights and recommendations for improving performance based on JMeter test results.
Args:
jtl_file: Path to the JTL file containing test results
Returns:
str: Performance insights and recommendations in a formatted string
"""
try:
analyzer = TestResultsAnalyzer()
# Validate file exists
file_path = Path(jtl_file)
if not file_path.exists():
return f"Error: JTL file not found: {jtl_file}"
try:
# Analyze the file with detailed analysis
analysis_results = analyzer.analyze_file(file_path, detailed=True)
# Format the results as a string
result_str = f"Performance Insights for {jtl_file}:\n\n"
# Add insights information
detailed_info = analysis_results.get("detailed", {})
insights = detailed_info.get("insights", {})
if not insights:
return f"No insights available for {jtl_file}."
# Recommendations
recommendations = insights.get("recommendations", [])
if recommendations:
result_str += "Recommendations:\n"
for i, rec in enumerate(recommendations[:5], 1): # Show top 5 recommendations
result_str += f"{i}. [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n"
result_str += f" - Recommendation: {rec.get('recommendation')}\n"
result_str += f" - Expected Impact: {rec.get('expected_impact')}\n"
result_str += f" - Implementation Difficulty: {rec.get('implementation_difficulty')}\n\n"
else:
result_str += "No specific recommendations available.\n\n"
# Scaling insights
scaling_insights = insights.get("scaling_insights", [])
if scaling_insights:
result_str += "Scaling Insights:\n"
for i, insight in enumerate(scaling_insights, 1):
result_str += f"{i}. {insight.get('topic')}\n"
result_str += f" {insight.get('description')}\n\n"
else:
result_str += "No scaling insights available.\n\n"
# Add summary metrics for context
summary = analysis_results.get("summary", {})
result_str += "Test Summary:\n"
result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n"
result_str += f"- Error rate: {summary.get('error_rate', 'N/A'):.2f}%\n"
result_str += f"- Average response time: {summary.get('average_response_time', 'N/A'):.2f} ms\n"
result_str += f"- 95th percentile: {summary.get('percentile_95', 'N/A'):.2f} ms\n"
result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n"
return result_str
except ValueError as e:
return f"Error analyzing JTL file: {str(e)}"
except Exception as e:
return f"Error getting performance insights: {str(e)}"
@mcp.tool()
async def generate_visualization(jtl_file: str, visualization_type: str, output_file: str) -> str:
"""Generate visualizations of JMeter test results.
Args:
jtl_file: Path to the JTL file containing test results
visualization_type: Type of visualization to generate (time_series, distribution, comparison, html_report)
output_file: Path to save the visualization
Returns:
str: Path to the generated visualization file
"""
try:
analyzer = TestResultsAnalyzer()
# Validate file exists
file_path = Path(jtl_file)
if not file_path.exists():
return f"Error: JTL file not found: {jtl_file}"
try:
# Analyze the file with detailed analysis
analysis_results = analyzer.analyze_file(file_path, detailed=True)
# Create visualization engine
output_dir = os.path.dirname(output_file) if output_file else None
engine = VisualizationEngine(output_dir=output_dir)
# Generate visualization based on type
if visualization_type == "time_series":
# Extract time series metrics
time_series = analysis_results.get("detailed", {}).get("time_series", [])
if not time_series:
return "No time series data available for visualization."
# Convert to TimeSeriesMetrics objects
metrics = []
for ts_data in time_series:
metrics.append(TimeSeriesMetrics(
timestamp=datetime.datetime.fromisoformat(ts_data["timestamp"]),
active_threads=ts_data["active_threads"],
throughput=ts_data["throughput"],
average_response_time=ts_data["average_response_time"],
error_rate=ts_data["error_rate"]
))
# Create visualization
output_path = engine.create_time_series_graph(
metrics, metric_name="average_response_time", output_file=output_file)
return f"Time series graph generated: {output_path}"
elif visualization_type == "distribution":
# Extract response times
samples = []
for endpoint, metrics in analysis_results.get("detailed", {}).get("endpoints", {}).items():
samples.extend([metrics["average_response_time"]] * metrics["total_samples"])
if not samples:
return "No response time data available for visualization."
# Create visualization
output_path = engine.create_distribution_graph(samples, output_file=output_file)
return f"Distribution graph generated: {output_path}"
elif visualization_type == "comparison":
# Extract endpoint metrics
endpoints = analysis_results.get("detailed", {}).get("endpoints", {})
if not endpoints:
return "No endpoint data available for visualization."
# Convert to EndpointMetrics objects
endpoint_metrics = {}
for endpoint, metrics_data in endpoints.items():
endpoint_metrics[endpoint] = EndpointMetrics(
endpoint=endpoint,
total_samples=metrics_data["total_samples"],
error_count=metrics_data["error_count"],
error_rate=metrics_data["error_rate"],
average_response_time=metrics_data["average_response_time"],
median_response_time=metrics_data["median_response_time"],
percentile_90=metrics_data["percentile_90"],
percentile_95=metrics_data["percentile_95"],
percentile_99=metrics_data["percentile_99"],
min_response_time=metrics_data["min_response_time"],
max_response_time=metrics_data["max_response_time"],
throughput=metrics_data["throughput"],
test_duration=analysis_results["summary"]["duration"]
)
# Create visualization
output_path = engine.create_endpoint_comparison_chart(
endpoint_metrics, metric_name="average_response_time", output_file=output_file)
return f"Endpoint comparison chart generated: {output_path}"
elif visualization_type == "html_report":
# Create HTML report
output_path = engine.create_html_report(analysis_results, output_file)
return f"HTML report generated: {output_path}"
else:
return f"Unknown visualization type: {visualization_type}. " \
f"Supported types: time_series, distribution, comparison, html_report"
except ValueError as e:
return f"Error generating visualization: {str(e)}"
except Exception as e:
return f"Error generating visualization: {str(e)}"
def generate_unique_id():
"""
Generate a unique identifier using timestamp and UUID.
Returns:
str: A unique identifier string
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
random_id = str(uuid.uuid4())[:8] # Use first 8 chars of UUID for brevity
return f"{timestamp}_{random_id}"
if __name__ == "__main__":
mcp.run(transport='stdio')
```