# Directory Structure
```
├── .gitignore
├── .python-version
├── package.json
├── pyproject.toml
├── README.md
├── setup.py
├── src
│ ├── mcp_dev_server
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── core
│ │ │ ├── __init__.py
│ │ │ └── server.py
│ │ ├── docker
│ │ │ ├── manager.py
│ │ │ ├── streams.py
│ │ │ ├── templates
│ │ │ │ ├── dev.dockerfile
│ │ │ │ ├── node.dockerfile
│ │ │ │ └── python.dockerfile
│ │ │ ├── templates.py
│ │ │ ├── volumes.py
│ │ │ └── xxx.py
│ │ ├── environments
│ │ │ ├── manager.py
│ │ │ ├── tools.py
│ │ │ └── workflow.py
│ │ ├── handlers
│ │ │ ├── __init__.py
│ │ │ └── input_request_handler.py
│ │ ├── managers
│ │ │ ├── __init__.py
│ │ │ ├── base_manager.py
│ │ │ ├── build_manager.py
│ │ │ ├── dependency_manager.py
│ │ │ ├── project_manager.py
│ │ │ ├── template_manager.py
│ │ │ ├── test_manager.py
│ │ │ └── workflow_manager.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── config.py
│ │ │ ├── errors.py
│ │ │ └── input_response.py
│ │ ├── package
│ │ │ └── manager.py
│ │ ├── project_manager
│ │ │ ├── base_project.py
│ │ │ ├── context.py
│ │ │ ├── git.py
│ │ │ ├── manager.py
│ │ │ ├── project_types.py
│ │ │ ├── project.py
│ │ │ └── templates.py
│ │ ├── prompts
│ │ │ ├── handler.py
│ │ │ ├── input_protocol.py
│ │ │ ├── project_templates.py
│ │ │ └── templates.py
│ │ ├── server.py
│ │ ├── test
│ │ │ └── manager.py
│ │ ├── utils
│ │ │ ├── __init__.py
│ │ │ ├── config.py
│ │ │ ├── errors.py
│ │ │ └── logging.py
│ │ └── workflow
│ │ └── manager.py
│ └── resources
│ └── templates
│ └── basic
│ └── files
├── tests
│ └── test_integration.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python
__pycache__/
*.py[cod]
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.env
.venv
env/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Project specific
*.log
.docker/
.pytest_cache/
.coverage
htmlcov/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# MCP Development Server
A Model Context Protocol (MCP) server that enables Claude to manage software development projects, providing complete project context awareness and handling code execution through Docker environments.
## Features
### Core Infrastructure
- Project context management
- File system operations
- Template-based project creation
- Git integration
### Requirements
- Python 3.12 or higher
- Docker
- Git
## Installation
```bash
# Using pip
pip install mcp-dev-server
# Development installation
git clone https://github.com/your-org/mcp-dev-server.git
cd mcp-dev-server
pip install -e .
```
## Configuration
### Claude Desktop Configuration
Add to your Claude Desktop configuration file:
On MacOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
```json
{
"mcpServers": {
"dev": {
"command": "mcp-dev-server",
"args": []
}
}
}
```
## Usage
The server provides several MCP capabilities:
### Resources
- Project structure and files
- Build status and artifacts
- Test results
- Docker container status
### Tools
- Project initialization
- Build operations
- Test execution
- Docker commands
### Prompts
- Project analysis
- Development suggestions
- Error diagnosis
## Development
### Setting up development environment
```bash
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dependencies
pip install -e ".[dev]"
```
### Running tests
```bash
pytest tests/
```
## Contributing
Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct and the process for submitting pull requests.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/xxx.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/core/__init__.py:
--------------------------------------------------------------------------------
```python
from .server import Server
__all__ = ['Server']
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/input_protocol.py:
--------------------------------------------------------------------------------
```python
"""Input request protocol for MCP server."""
[Previous content...]
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/handlers/__init__.py:
--------------------------------------------------------------------------------
```python
from .input_request_handler import InputRequestHandler
__all__ = ['InputRequestHandler']
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/models/errors.py:
--------------------------------------------------------------------------------
```python
class MCPDevServerError(Exception):
"""Base exception class for MCP Development Server errors."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/__main__.py:
--------------------------------------------------------------------------------
```python
"""Main entry point when run with python -m mcp_dev_server"""
from . import main
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/test_manager.py:
--------------------------------------------------------------------------------
```python
class TestManager:
"""Manager class for test-related operations."""
def __init__(self):
"""Initialize the test manager."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/build_manager.py:
--------------------------------------------------------------------------------
```python
class BuildManager:
"""Manager class for build-related operations."""
def __init__(self):
"""Initialize the build manager."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/models/__init__.py:
--------------------------------------------------------------------------------
```python
from .config import Config
from .input_response import InputResponse
from .errors import MCPDevServerError
__all__ = ['Config', 'InputResponse', 'MCPDevServerError']
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/template_manager.py:
--------------------------------------------------------------------------------
```python
class TemplateManager:
"""Manager class for template-related operations."""
def __init__(self):
"""Initialize the template manager."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/workflow_manager.py:
--------------------------------------------------------------------------------
```python
class WorkflowManager:
"""Manager class for workflow-related operations."""
def __init__(self):
"""Initialize the workflow manager."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/dependency_manager.py:
--------------------------------------------------------------------------------
```python
class DependencyManager:
"""Manager class for dependency-related operations."""
def __init__(self):
"""Initialize the dependency manager."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/project_manager.py:
--------------------------------------------------------------------------------
```python
from ..models import Config
class ProjectManager:
"""Manager class for project-related operations."""
def __init__(self, config: Config):
"""Initialize the project manager.
Args:
config: Server configuration
"""
self.config = config
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/__init__.py:
--------------------------------------------------------------------------------
```python
from .project_manager import ProjectManager
from .template_manager import TemplateManager
from .build_manager import BuildManager
from .dependency_manager import DependencyManager
from .test_manager import TestManager
from .workflow_manager import WorkflowManager
__all__ = [
'ProjectManager',
'TemplateManager',
'BuildManager',
'DependencyManager',
'TestManager',
'WorkflowManager'
]
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/base_manager.py:
--------------------------------------------------------------------------------
```python
"""Base manager class with common functionality."""
import uuid
from typing import Dict, Any
class BaseManager:
"""Base class for all managers."""
def _generate_id(self) -> str:
"""Generate a unique identifier.
Returns:
str: Unique identifier
"""
return str(uuid.uuid4())
async def cleanup(self):
"""Clean up resources. Override in subclasses."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/__init__.py:
--------------------------------------------------------------------------------
```python
"""MCP Development Server Package."""
from . import server
import asyncio
from typing import Optional
from .utils.logging import setup_logging
logger = setup_logging(__name__)
def main():
"""Main entry point for the package."""
try:
server_instance = server.MCPDevServer()
asyncio.run(server_instance.run())
except KeyboardInterrupt:
logger.info("Server shutdown requested")
except Exception as e:
logger.error(f"Server error: {str(e)}")
raise
# Expose key components at package level
__all__ = ['main', 'server']
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
from setuptools import setup, find_packages
setup(
name="mcp-dev-server",
version="0.1.0",
packages=find_packages(where="src"),
package_dir={"": "src"},
install_requires=[
"mcp", # Base MCP package
"aiohttp>=3.8.0",
"websockets>=10.0",
"uvicorn>=0.15.0",
"fastapi>=0.68.0",
"typing_extensions>=4.5.0",
],
entry_points={
"console_scripts": [
"mcp-dev-server=mcp_dev_server:main",
],
},
python_requires=">=3.8",
author="Your Name",
description="MCP Development Server"
)
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/models/input_response.py:
--------------------------------------------------------------------------------
```python
from typing import Any, Dict
class InputResponse:
"""Class representing a user's input response."""
def __init__(self, request_id: str, values: Dict[str, Any]):
"""Initialize an input response.
Args:
request_id: ID of the input request
values: Dictionary of input values
"""
self.request_id = request_id
self.values = values
def validate(self) -> bool:
"""Validate the input response.
Returns:
bool: True if valid, False otherwise
"""
return True # TODO: Implement validation
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/errors.py:
--------------------------------------------------------------------------------
```python
"""Error definitions for MCP Development Server."""
class MCPDevServerError(Exception):
"""Base error class for MCP Development Server."""
pass
class ProjectError(MCPDevServerError):
"""Project-related errors."""
pass
class BuildError(MCPDevServerError):
"""Build-related errors."""
pass
class TestError(MCPDevServerError):
"""Test-related errors."""
pass
class EnvironmentError(MCPDevServerError):
"""Environment-related errors."""
pass
class ConfigurationError(MCPDevServerError):
"""Configuration-related errors."""
pass
class WorkflowError(MCPDevServerError):
"""Workflow-related errors."""
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates/node.dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Node.js development environment
FROM node:{{ node_version }}
# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
curl \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /workspace
{% if package_file %}
# Install Node.js dependencies
COPY {{ package_file }} .
{% if package_lock %}
COPY {{ package_lock }} .
RUN npm ci
{% else %}
RUN npm install
{% endif %}
{% endif %}
{% if global_packages %}
# Install global packages
RUN npm install -g {% for package in global_packages %}{{ package }} {% endfor %}
{% endif %}
# Set Node.js environment variables
ENV NODE_ENV=development
{% if command %}
# Default command
CMD {{ command }}
{% endif %}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/models/config.py:
--------------------------------------------------------------------------------
```python
class Config:
"""Configuration class for MCP Development Server."""
def __init__(self):
"""Initialize configuration with default values."""
self.host = "localhost"
self.port = 8000
self.debug = False
def load_from_file(self, file_path: str):
"""Load configuration from a file.
Args:
file_path: Path to configuration file
"""
pass # TODO: Implement configuration loading
def save_to_file(self, file_path: str):
"""Save current configuration to a file.
Args:
file_path: Path to save configuration
"""
pass # TODO: Implement configuration saving
```
--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "mcp-dev-server",
"version": "1.0.0",
"description": "Model Context Protocol Development Server",
"main": "dist/app.js",
"scripts": {
"start": "node dist/app.js",
"dev": "nodemon src/app.ts",
"build": "tsc",
"test": "jest",
"lint": "eslint . --ext .ts"
},
"dependencies": {
"express": "^4.18.2",
"typescript": "^5.0.0",
"mongoose": "^7.0.0",
"dotenv": "^16.0.0",
"winston": "^3.8.0",
"cors": "^2.8.5",
"helmet": "^6.0.0",
"joi": "^17.0.0"
},
"devDependencies": {
"@types/express": "^4.17.17",
"@types/node": "^18.0.0",
"@types/jest": "^29.0.0",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^8.0.0",
"jest": "^29.0.0",
"nodemon": "^2.0.0",
"ts-jest": "^29.0.0",
"ts-node": "^10.0.0"
}
}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates/python.dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Python development environment
FROM python:{{ python_version }}-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
curl \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /workspace
{% if install_poetry %}
# Install Poetry
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="/root/.local/bin:$PATH"
{% endif %}
{% if requirements_file %}
# Install Python dependencies
COPY {{ requirements_file }} .
RUN pip install -r {{ requirements_file }}
{% endif %}
{% if additional_packages %}
# Install additional packages
RUN pip install {% for package in additional_packages %}{{ package }} {% endfor %}
{% endif %}
# Set Python environment variables
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1
{% if command %}
# Default command
CMD {{ command }}
{% endif %}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/logging.py:
--------------------------------------------------------------------------------
```python
"""Logging configuration for MCP Development Server."""
import logging
import sys
from typing import Optional
def setup_logging(name: Optional[str] = None, level: int = logging.INFO) -> logging.Logger:
"""Setup logging configuration.
Args:
name: Logger name
level: Logging level
Returns:
logging.Logger: Configured logger instance
"""
# Create logger
logger = logging.getLogger(name or __name__)
logger.setLevel(level)
# Create stderr handler (MCP protocol requires clean stdout)
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(level)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
return logger
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/handlers/input_request_handler.py:
--------------------------------------------------------------------------------
```python
from typing import Dict, Any, Optional
from ..models import InputResponse
class InputRequestHandler:
"""Handler for input requests."""
def __init__(self):
"""Initialize the input request handler."""
pass
async def request_input(self, request_type: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Request input from the user.
Args:
request_type: Type of input request
context: Additional context for request
Returns:
Dict[str, Any]: User's input values
"""
return {} # TODO: Implement input request handling
def handle_response(self, response: InputResponse):
"""Handle input response from user.
Args:
response: User's response
"""
pass # TODO: Implement response handling
```
--------------------------------------------------------------------------------
/tests/test_integration.py:
--------------------------------------------------------------------------------
```python
"""Test MCP server integration with Claude."""
import asyncio
import pytest
from mcp_dev_server.server import MCPDevServer
from mcp_dev_server.utils.config import Config
@pytest.mark.asyncio
async def test_server_initialization():
"""Test server initialization."""
config = Config()
server = MCPDevServer()
# Test project creation
project = await server.project_manager.create_project(
name="test-project",
project_type="python",
project_config={
"python_version": "3.12",
"project_type": "fastapi",
"dependency_management": "poetry"
}
)
assert project is not None
assert project.config["name"] == "test-project"
# Test tool execution
result = await server.handle_call_tool("build", {
"environment": "default",
"command": "build"
})
assert result[0].type == "text"
# Cleanup
await server.cleanup()
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates.py:
--------------------------------------------------------------------------------
```python
"""Dockerfile templates for different environments."""
from typing import Dict, Optional
from jinja2 import Template
class DockerTemplates:
"""Manages Dockerfile templates for different environments."""
@staticmethod
def get_template(environment: str, config: Optional[Dict[str, Any]] = None) -> str:
"""Get Dockerfile template for specific environment."""
config = config or {}
if environment == "python":
return Template("""
FROM python:{{ python_version|default('3.12-slim') }}
WORKDIR /app
{% if requirements_file %}
COPY {{ requirements_file }} .
RUN pip install -r {{ requirements_file }}
{% endif %}
{% if install_dev_deps %}
RUN pip install pytest mypy black
{% endif %}
{% for cmd in additional_commands|default([]) %}
RUN {{ cmd }}
{% endfor %}
COPY . .
CMD ["python", "{{ entry_point|default('main.py') }}"]
""").render(config)
elif environment == "node":
return Template("""
FROM node:{{ node_version|default('20-slim') }}
WORKDIR /app
COPY package*.json ./
RUN npm install {% if install_dev_deps %}--include=dev{% endif %}
{% for cmd in additional_commands|default([]) %}
RUN {{ cmd }}
{% endfor %}
COPY . .
CMD ["npm", "{{ npm_command|default('start') }}"]
""").render(config)
else:
raise ValueError(f"Unknown environment: {environment}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates/dev.dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Multi-language development environment
FROM ubuntu:{{ ubuntu_version }}
# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
curl \
build-essential \
software-properties-common \
&& rm -rf /var/lib/apt/lists/*
{% if install_python %}
# Install Python
RUN add-apt-repository ppa:deadsnakes/ppa && \
apt-get update && \
apt-get install -y python{{ python_version }} python{{ python_version }}-venv python{{ python_version }}-dev && \
rm -rf /var/lib/apt/lists/*
{% endif %}
{% if install_node %}
# Install Node.js
RUN curl -fsSL https://deb.nodesource.com/setup_{{ node_version }}.x | bash - && \
apt-get install -y nodejs && \
rm -rf /var/lib/apt/lists/*
{% endif %}
{% if install_docker %}
# Install Docker
RUN curl -fsSL https://get.docker.com | sh && \
rm -rf /var/lib/apt/lists/*
{% endif %}
# Set working directory
WORKDIR /workspace
{% if requirements_file %}
# Install Python dependencies
COPY {{ requirements_file }} .
RUN pip{{ python_version }} install -r {{ requirements_file }}
{% endif %}
{% if package_file %}
# Install Node.js dependencies
COPY {{ package_file }} .
{% if package_lock %}
COPY {{ package_lock }} .
RUN npm ci
{% else %}
RUN npm install
{% endif %}
{% endif %}
{% if additional_tools %}
# Install additional tools
RUN apt-get update && apt-get install -y \
{% for tool in additional_tools %}{{ tool }} {% endfor %} \
&& rm -rf /var/lib/apt/lists/*
{% endif %}
# Set environment variables
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
NODE_ENV=development
{% if command %}
# Default command
CMD {{ command }}
{% endif %}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/handler.py:
--------------------------------------------------------------------------------
```python
[Previous handler.py content...]
async def process_field_dependencies(self, request: InputRequest, field_updates: Dict[str, Any]):
"""Process field dependencies based on user input.
Some fields might need to be updated based on values of other fields.
For example, if user selects Python as language, we need to show Python version field.
Args:
request: Current input request
field_updates: Updated field values
"""
try:
if request.request_id == "environment_setup":
language = field_updates.get("language")
if language:
# Update required fields based on language selection
for field in request.fields:
if field.name == "python_version":
field.required = language in ["python", "both"]
elif field.name == "node_version":
field.required = language in ["node", "both"]
elif request.request_id == "test_configuration":
test_framework = field_updates.get("test_framework")
if test_framework:
# Update coverage options based on test framework
for field in request.fields:
if field.name == "include_coverage":
field.options = self._get_coverage_options(test_framework)
def _get_coverage_options(self, framework: str) -> List[Dict[str, str]]:
"""Get coverage tool options based on test framework."""
coverage_tools = {
"pytest": [
{"value": "pytest-cov", "label": "pytest-cov"},
{"value": "coverage", "label": "coverage.py"}
],
"unittest": [
{"value": "coverage", "label": "coverage.py"}
],
"jest": [
{"value": "jest-coverage", "label": "Jest Coverage"}
],
"mocha": [
{"value": "nyc", "label": "Istanbul/nyc"}
]
}
return coverage_tools.get(framework, [])
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/config.py:
--------------------------------------------------------------------------------
```python
"""Configuration management for MCP Development Server."""
import os
import json
from typing import Dict, Any, Optional
from pathlib import Path
class Config:
"""Configuration manager."""
def __init__(self):
"""Initialize configuration."""
self.config_dir = self._get_config_dir()
self.config_file = self.config_dir / "config.json"
self.config: Dict[str, Any] = self._load_config()
def _get_config_dir(self) -> Path:
"""Get configuration directory path."""
if os.name == "nt": # Windows
config_dir = Path(os.getenv("APPDATA")) / "Claude"
else: # macOS/Linux
config_dir = Path.home() / ".config" / "claude"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file."""
if self.config_file.exists():
try:
with open(self.config_file, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading config: {e}")
return self._get_default_config()
else:
config = self._get_default_config()
self._save_config(config)
return config
def _save_config(self, config: Dict[str, Any]):
"""Save configuration to file."""
try:
with open(self.config_file, "w") as f:
json.dump(config, f, indent=2)
except Exception as e:
print(f"Error saving config: {e}")
def _get_default_config(self) -> Dict[str, Any]:
"""Get default configuration."""
return {
"projectsDir": str(Path.home() / "Projects"),
"templatesDir": str(self.config_dir / "templates"),
"environments": {
"default": {
"type": "docker",
"image": "python:3.12-slim"
}
}
}
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value."""
return self.config.get(key, default)
def set(self, key: str, value: Any):
"""Set configuration value."""
self.config[key] = value
self._save_config(self.config)
def update(self, updates: Dict[str, Any]):
"""Update multiple configuration values."""
self.config.update(updates)
self._save_config(self.config)
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/volumes.py:
--------------------------------------------------------------------------------
```python
"""Docker volume management for MCP Development Server."""
from typing import Dict, List, Optional
import docker
from docker.errors import DockerException
from ..utils.logging import setup_logging
from ..utils.errors import DockerError
logger = setup_logging(__name__)
class VolumeManager:
"""Manages Docker volumes for development environments."""
def __init__(self):
self.client = docker.from_env()
async def create_volume(
self,
name: str,
labels: Optional[Dict[str, str]] = None
) -> str:
"""Create a Docker volume."""
try:
volume = self.client.volumes.create(
name=name,
driver='local',
labels=labels or {}
)
logger.info(f"Created volume: {name}")
return volume.name
except DockerException as e:
raise DockerError(f"Failed to create volume: {str(e)}")
async def remove_volume(self, name: str) -> None:
"""Remove a Docker volume."""
try:
volume = self.client.volumes.get(name)
volume.remove()
logger.info(f"Removed volume: {name}")
except DockerException as e:
raise DockerError(f"Failed to remove volume: {str(e)}")
async def list_volumes(
self,
filters: Optional[Dict[str, str]] = None
) -> List[Dict[str, Any]]:
"""List Docker volumes."""
try:
volumes = self.client.volumes.list(filters=filters or {})
return [
{
"name": v.name,
"driver": v.attrs['Driver'],
"mountpoint": v.attrs['Mountpoint'],
"labels": v.attrs['Labels'] or {}
}
for v in volumes
]
except DockerException as e:
raise DockerError(f"Failed to list volumes: {str(e)}")
async def get_volume_info(self, name: str) -> Dict[str, Any]:
"""Get detailed information about a volume."""
try:
volume = self.client.volumes.get(name)
return {
"name": volume.name,
"driver": volume.attrs['Driver'],
"mountpoint": volume.attrs['Mountpoint'],
"labels": volume.attrs['Labels'] or {},
"scope": volume.attrs['Scope'],
"status": volume.attrs.get('Status', {})
}
except DockerException as e:
raise DockerError(f"Failed to get volume info: {str(e)}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/package/manager.py:
--------------------------------------------------------------------------------
```python
"""Package management integration for MCP Development Server."""
from typing import Dict, List, Optional, Any
from enum import Enum
from ..utils.errors import PackageError
from ..utils.logging import setup_logging
logger = setup_logging(__name__)
class PackageManager(str, Enum):
"""Supported package managers."""
NPM = "npm"
PIP = "pip"
CARGO = "cargo"
class DependencyManager:
"""Manages project dependencies."""
def __init__(self, env_manager):
self.env_manager = env_manager
async def install_dependencies(
self,
environment: str,
package_manager: PackageManager,
dependencies: List[str],
dev: bool = False
) -> Dict[str, Any]:
"""Install project dependencies."""
try:
command = self._build_install_command(
package_manager,
dependencies,
dev
)
result = await self.env_manager.execute_in_environment(
environment,
command
)
return {
"success": result["exit_code"] == 0,
"output": result["output"],
"error": result.get("error")
}
except Exception as e:
raise PackageError(f"Failed to install dependencies: {str(e)}")
async def update_dependencies(
self,
environment: str,
package_manager: PackageManager,
dependencies: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Update project dependencies."""
try:
command = self._build_update_command(package_manager, dependencies)
result = await self.env_manager.execute_in_environment(
environment,
command
)
return {
"success": result["exit_code"] == 0,
"output": result["output"],
"error": result.get("error")
}
except Exception as e:
raise PackageError(f"Failed to update dependencies: {str(e)}")
def _build_install_command(
self,
package_manager: PackageManager,
dependencies: List[str],
dev: bool
) -> str:
"""Build dependency installation command."""
if package_manager == PackageManager.NPM:
dev_flag = "--save-dev" if dev else ""
deps = " ".join(dependencies)
return f"npm install {dev_flag} {deps}"
elif package_manager == PackageManager.PIP:
dev_flag = "-D" if dev else ""
deps = " ".join(dependencies)
return f"pip install {dev_flag} {deps}"
elif package_manager == PackageManager.CARGO:
dev_flag = "--dev" if dev else ""
deps = " ".join(dependencies)
return f"cargo add {dev_flag} {deps}"
else:
raise PackageError(f"Unsupported package manager: {package_manager}")
def _build_update_command(
self,
package_manager: PackageManager,
dependencies: Optional[List[str]] = None
) -> str:
"""Build dependency update command."""
if package_manager == PackageManager.NPM:
return "npm update" if not dependencies else f"npm update {' '.join(dependencies)}"
elif package_manager == PackageManager.PIP:
return "pip install -U -r requirements.txt" if not dependencies else f"pip install -U {' '.join(dependencies)}"
elif package_manager == PackageManager.CARGO:
return "cargo update" if not dependencies else f"cargo update {' '.join(dependencies)}"
else:
raise PackageError(f"Unsupported package manager: {package_manager}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/test/manager.py:
--------------------------------------------------------------------------------
```python
"""Test system integration for MCP Development Server."""
import asyncio
from typing import Dict, List, Optional, Any
from enum import Enum
from datetime import datetime
from ..utils.errors import TestError
from ..utils.logging import setup_logging
logger = setup_logging(__name__)
class TestStatus(str, Enum):
"""Test execution status."""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
ERROR = "error"
class TestManager:
"""Manages test execution and reporting."""
def __init__(self, env_manager):
self.env_manager = env_manager
self.test_runs: Dict[str, Dict[str, Any]] = {}
async def run_tests(
self,
environment: str,
config: Dict[str, Any]
) -> str:
"""Start a test run."""
try:
test_id = f"test_{len(self.test_runs)}"
# Initialize test run
self.test_runs[test_id] = {
"environment": environment,
"config": config,
"status": TestStatus.PENDING,
"results": [],
"start_time": datetime.now(),
"end_time": None
}
# Start test execution
asyncio.create_task(self._execute_tests(test_id))
return test_id
except Exception as e:
raise TestError(f"Failed to start tests: {str(e)}")
async def _execute_tests(self, test_id: str) -> None:
"""Execute test suite."""
try:
test_run = self.test_runs[test_id]
test_run["status"] = TestStatus.RUNNING
# Run test command
result = await self.env_manager.execute_in_environment(
test_run["environment"],
test_run["config"].get("command", "npm test"),
workdir=test_run["config"].get("workdir")
)
# Parse and store results
test_run["results"] = self._parse_test_output(
result["output"],
test_run["config"].get("format", "jest")
)
# Update test status
test_run["end_time"] = datetime.now()
test_run["status"] = (
TestStatus.SUCCESS
if result["exit_code"] == 0
else TestStatus.FAILED
)
except Exception as e:
logger.error(f"Test execution error: {str(e)}")
test_run["status"] = TestStatus.ERROR
test_run["error"] = str(e)
async def get_test_status(self, test_id: str) -> Dict[str, Any]:
"""Get status and results of a test run."""
if test_run := self.test_runs.get(test_id):
return {
"id": test_id,
"status": test_run["status"],
"results": test_run["results"],
"start_time": test_run["start_time"],
"end_time": test_run["end_time"],
"error": test_run.get("error")
}
raise TestError(f"Test run not found: {test_id}")
def _parse_test_output(
self,
output: str,
format: str
) -> List[Dict[str, Any]]:
"""Parse test output into structured results."""
if format == "jest":
return self._parse_jest_output(output)
elif format == "pytest":
return self._parse_pytest_output(output)
else:
logger.warning(f"Unknown test output format: {format}")
return [{"raw_output": output}]
def _parse_jest_output(self, output: str) -> List[Dict[str, Any]]:
"""Parse Jest test output."""
results = []
# Implement Jest output parsing
return results
def _parse_pytest_output(self, output: str) -> List[Dict[str, Any]]:
"""Parse pytest output."""
results = []
# Implement pytest output parsing
return results
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/server.py:
--------------------------------------------------------------------------------
```python
"""MCP Development Server implementation."""
from typing import Dict, Any, Optional, Sequence
import logging
import sys
import json
# Import MCP components
from mcp.server import Server as MCPServer
from mcp.server.stdio import stdio_server
import mcp.types as types
from .models import Config, InputResponse, MCPDevServerError
from .managers import (
ProjectManager,
TemplateManager,
BuildManager,
DependencyManager,
TestManager,
WorkflowManager
)
from .handlers import InputRequestHandler
# Configure logging to stderr to keep stdout clean
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG) # Set to DEBUG for development
class MCPDevServer:
"""MCP Development Server implementation."""
def __init__(self):
"""Initialize the MCP Development Server."""
logger.info("Initializing MCP Development Server")
try:
# Initialize server
self.server = MCPServer("mcp-dev-server")
# Initialize configuration
self.config = Config()
# Initialize all managers
self.project_manager = ProjectManager(self.config)
self.template_manager = TemplateManager()
self.build_manager = BuildManager()
self.dependency_manager = DependencyManager()
self.test_manager = TestManager()
self.workflow_manager = WorkflowManager()
self.input_handler = InputRequestHandler()
# Setup request handlers
self._setup_resource_handlers()
self._setup_tool_handlers()
self._setup_prompt_handlers()
logger.info("Server initialization completed successfully")
except Exception as e:
logger.error(f"Failed to initialize server: {e}")
raise
def _setup_resource_handlers(self):
"""Set up resource request handlers."""
@self.server.list_resources()
async def list_resources() -> list[types.Resource]:
"""List available resources."""
logger.debug("Listing resources")
return []
@self.server.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource content."""
logger.debug(f"Reading resource: {uri}")
return ""
def _setup_tool_handlers(self):
"""Set up tool request handlers."""
@self.server.list_tools()
async def list_tools() -> list[types.Tool]:
"""List available tools."""
logger.debug("Listing tools")
return []
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> Sequence[types.TextContent]:
"""Execute a tool."""
logger.debug(f"Calling tool {name} with arguments {arguments}")
return [types.TextContent(type="text", text="Tool execution result")]
def _setup_prompt_handlers(self):
"""Set up prompt request handlers."""
@self.server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
"""List available prompts."""
logger.debug("Listing prompts")
return []
async def run(self):
"""Run the MCP Development Server."""
try:
logger.info(f"Starting {self.server.name}...")
# Use stdio transport
async with stdio_server() as streams:
logger.info("Using stdio transport")
await self.server.run(
streams[0], # read stream
streams[1], # write stream
self.server.create_initialization_options(),
raise_exceptions=True # Enable for debugging
)
except Exception as e:
logger.error(f"Server error: {str(e)}")
raise MCPDevServerError(f"Server error: {str(e)}")
finally:
logger.info("Server shutdown")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/templates.py:
--------------------------------------------------------------------------------
```python
"""Input request templates for common scenarios."""
from typing import Dict
from .input_protocol import InputRequest, InputField
ENVIRONMENT_SETUP = InputRequest(
request_id="environment_setup",
title="Setup Development Environment",
description="Configure your development environment",
fields=[
InputField(
name="language",
type="select",
description="Primary programming language",
options=[
{"value": "python", "label": "Python"},
{"value": "node", "label": "Node.js"},
{"value": "both", "label": "Python & Node.js"}
]
),
InputField(
name="python_version",
type="select",
description="Python version",
options=[
{"value": "3.12", "label": "Python 3.12"},
{"value": "3.11", "label": "Python 3.11"},
{"value": "3.10", "label": "Python 3.10"}
],
required=False
),
InputField(
name="node_version",
type="select",
description="Node.js version",
options=[
{"value": "20", "label": "Node.js 20 LTS"},
{"value": "18", "label": "Node.js 18 LTS"}
],
required=False
),
InputField(
name="include_docker",
type="confirm",
description="Include Docker support?",
default=False
)
]
)
TEST_CONFIGURATION = InputRequest(
request_id="test_configuration",
title="Configure Test Environment",
description="Set up testing parameters",
fields=[
InputField(
name="test_framework",
type="select",
description="Testing framework",
options=[
{"value": "pytest", "label": "pytest"},
{"value": "unittest", "label": "unittest"},
{"value": "jest", "label": "Jest"},
{"value": "mocha", "label": "Mocha"}
]
),
InputField(
name="include_coverage",
type="confirm",
description="Include coverage reporting?",
default=True
),
InputField(
name="parallel",
type="confirm",
description="Run tests in parallel?",
default=False
),
InputField(
name="test_path",
type="text",
description="Test directory or file pattern",
default="tests/",
required=False
)
]
)
DEPLOYMENT_CONFIG = InputRequest(
request_id="deployment_config",
title="Configure Deployment",
description="Set up deployment parameters",
fields=[
InputField(
name="environment",
type="select",
description="Deployment environment",
options=[
{"value": "development", "label": "Development"},
{"value": "staging", "label": "Staging"},
{"value": "production", "label": "Production"}
]
),
InputField(
name="deploy_method",
type="select",
description="Deployment method",
options=[
{"value": "docker", "label": "Docker Container"},
{"value": "kubernetes", "label": "Kubernetes"},
{"value": "serverless", "label": "Serverless"}
]
),
InputField(
name="auto_deploy",
type="confirm",
description="Enable automatic deployment?",
default=False
),
InputField(
name="rollback_enabled",
type="confirm",
description="Enable automatic rollback?",
default=True
)
]
)
DEBUG_CONFIG = InputRequest(
request_id="debug_config",
title="Configure Debugging Session",
description="Set up debugging parameters",
fields=[
InputField(
name="debug_type",
type="select",
description="Type of debugging",
options=[
{"value": "python", "label": "Python Debugger"},
{"value": "node", "label": "Node.js Debugger"},
{"value": "remote", "label": "Remote Debugging"}
]
),
InputField(
name="port",
type="number",
description="Debug port",
default=9229,
validation={"min": 1024, "max": 65535}
),
InputField(
name="break_on_entry",
type="confirm",
description="Break on entry point?",
default=True
)
]
)
TEMPLATE_REQUESTS: Dict[str, InputRequest] = {
"environment_setup": ENVIRONMENT_SETUP,
"test_configuration": TEST_CONFIGURATION,
"deployment_config": DEPLOYMENT_CONFIG,
"debug_config": DEBUG_CONFIG
}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/project_types.py:
--------------------------------------------------------------------------------
```python
"""Project type definitions and configurations."""
from typing import Dict, Any, List
from enum import Enum
class BuildSystem(str, Enum):
"""Build system types."""
MAVEN = "maven"
GRADLE = "gradle"
NPM = "npm"
YARN = "yarn"
PIP = "pip"
POETRY = "poetry"
DOTNET = "dotnet"
CARGO = "cargo"
GO = "go"
SBT = "sbt"
class ProjectType:
"""Base project type configuration."""
def __init__(
self,
name: str,
description: str,
file_structure: Dict[str, Any],
build_systems: List[BuildSystem],
default_build_system: BuildSystem,
config_files: List[str],
environment_variables: Dict[str, str],
docker_templates: List[str],
input_templates: List[str]
):
self.name = name
self.description = description
self.file_structure = file_structure
self.build_systems = build_systems
self.default_build_system = default_build_system
self.config_files = config_files
self.environment_variables = environment_variables
self.docker_templates = docker_templates
self.input_templates = input_templates
# Define standard project types
JAVA_PROJECT = ProjectType(
name="java",
description="Java project",
file_structure={
"src/": {
"main/": {
"java/": {},
"resources/": {}
},
"test/": {
"java/": {},
"resources/": {}
}
},
"target/": {},
},
build_systems=[BuildSystem.MAVEN, BuildSystem.GRADLE],
default_build_system=BuildSystem.MAVEN,
config_files=["pom.xml", "build.gradle", ".gitignore", "README.md"],
environment_variables={
"JAVA_HOME": "",
"MAVEN_HOME": "",
"GRADLE_HOME": ""
},
docker_templates=["java-maven", "java-gradle"],
input_templates=["java_config", "maven_config", "gradle_config"]
)
DOTNET_PROJECT = ProjectType(
name="dotnet",
description=".NET project",
file_structure={
"src/": {},
"tests/": {},
"docs/": {}
},
build_systems=[BuildSystem.DOTNET],
default_build_system=BuildSystem.DOTNET,
config_files=[".csproj", ".sln", "global.json", ".gitignore", "README.md"],
environment_variables={
"DOTNET_ROOT": "",
"ASPNETCORE_ENVIRONMENT": "Development"
},
docker_templates=["dotnet-sdk", "dotnet-runtime"],
input_templates=["dotnet_config", "aspnet_config"]
)
NODE_PROJECT = ProjectType(
name="node",
description="Node.js project",
file_structure={
"src/": {},
"tests/": {},
"dist/": {},
"public/": {}
},
build_systems=[BuildSystem.NPM, BuildSystem.YARN],
default_build_system=BuildSystem.NPM,
config_files=["package.json", "tsconfig.json", ".gitignore", "README.md"],
environment_variables={
"NODE_ENV": "development",
"NPM_TOKEN": ""
},
docker_templates=["node-dev", "node-prod"],
input_templates=["node_config", "npm_config", "typescript_config"]
)
PYTHON_PROJECT = ProjectType(
name="python",
description="Python project",
file_structure={
"src/": {},
"tests/": {},
"docs/": {},
"notebooks/": {}
},
build_systems=[BuildSystem.PIP, BuildSystem.POETRY],
default_build_system=BuildSystem.POETRY,
config_files=["pyproject.toml", "setup.py", "requirements.txt", ".gitignore", "README.md"],
environment_variables={
"PYTHONPATH": "src",
"PYTHON_ENV": "development"
},
docker_templates=["python-dev", "python-prod"],
input_templates=["python_config", "poetry_config", "pytest_config"]
)
GOLANG_PROJECT = ProjectType(
name="golang",
description="Go project",
file_structure={
"cmd/": {},
"internal/": {},
"pkg/": {},
"api/": {}
},
build_systems=[BuildSystem.GO],
default_build_system=BuildSystem.GO,
config_files=["go.mod", "go.sum", ".gitignore", "README.md"],
environment_variables={
"GOPATH": "",
"GO111MODULE": "on"
},
docker_templates=["golang-dev", "golang-prod"],
input_templates=["golang_config", "go_mod_config"]
)
RUST_PROJECT = ProjectType(
name="rust",
description="Rust project",
file_structure={
"src/": {},
"tests/": {},
"benches/": {},
"examples/": {}
},
build_systems=[BuildSystem.CARGO],
default_build_system=BuildSystem.CARGO,
config_files=["Cargo.toml", "Cargo.lock", ".gitignore", "README.md"],
environment_variables={
"RUST_BACKTRACE": "1",
"CARGO_HOME": ""
},
docker_templates=["rust-dev", "rust-prod"],
input_templates=["rust_config", "cargo_config"]
)
# Map of all available project types
PROJECT_TYPES: Dict[str, ProjectType] = {
"java": JAVA_PROJECT,
"dotnet": DOTNET_PROJECT,
"node": NODE_PROJECT,
"python": PYTHON_PROJECT,
"golang": GOLANG_PROJECT,
"rust": RUST_PROJECT
}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/git.py:
--------------------------------------------------------------------------------
```python
"""Git integration for MCP Development Server."""
import os
from typing import List, Optional
from git import Repo, GitCommandError
from git.objects import Commit
from ..utils.logging import setup_logging
from ..utils.errors import GitError
logger = setup_logging(__name__)
class GitManager:
"""Manages Git operations for a project."""
def __init__(self, project_path: str):
self.project_path = project_path
self.repo: Optional[Repo] = None
async def initialize(self) -> None:
"""Initialize Git repository."""
try:
self.repo = Repo.init(self.project_path)
# Create default .gitignore if it doesn't exist
gitignore_path = os.path.join(self.project_path, '.gitignore')
if not os.path.exists(gitignore_path):
with open(gitignore_path, 'w') as f:
f.write('\n'.join([
'# Python',
'__pycache__/',
'*.pyc',
'*.pyo',
'*.pyd',
'.Python',
'env/',
'venv/',
'.env',
'.venv',
'',
'# IDE',
'.idea/',
'.vscode/',
'*.swp',
'*.swo',
'',
'# Project specific',
'.mcp/',
'dist/',
'build/',
'*.egg-info/',
''
]))
# Initial commit
if not self.repo.heads:
self.repo.index.add(['.gitignore'])
self.repo.index.commit("Initial commit")
logger.info(f"Initialized Git repository at {self.project_path}")
except Exception as e:
raise GitError(f"Git initialization failed: {str(e)}")
async def get_status(self) -> dict:
"""Get repository status."""
try:
if not self.repo:
raise GitError("Git repository not initialized")
return {
"branch": self.repo.active_branch.name,
"changed_files": [item.a_path for item in self.repo.index.diff(None)],
"untracked_files": self.repo.untracked_files,
"is_dirty": self.repo.is_dirty(),
"head_commit": {
"hash": self.repo.head.commit.hexsha,
"message": self.repo.head.commit.message,
"author": str(self.repo.head.commit.author),
"date": str(self.repo.head.commit.authored_datetime)
}
}
except Exception as e:
raise GitError(f"Failed to get Git status: {str(e)}")
async def commit(self, message: str, files: Optional[List[str]] = None) -> str:
"""Create a new commit."""
try:
if not self.repo:
raise GitError("Git repository not initialized")
# Add specified files or all changes
if files:
self.repo.index.add(files)
else:
self.repo.index.add('.')
# Create commit
commit = self.repo.index.commit(message)
logger.info(f"Created commit: {commit.hexsha}")
return commit.hexsha
except Exception as e:
raise GitError(f"Failed to create commit: {str(e)}")
async def get_commit_history(
self,
max_count: Optional[int] = None
) -> List[dict]:
"""Get commit history."""
try:
if not self.repo:
raise GitError("Git repository not initialized")
commits = []
for commit in self.repo.iter_commits(max_count=max_count):
commits.append({
"hash": commit.hexsha,
"message": commit.message,
"author": str(commit.author),
"date": str(commit.authored_datetime),
"files": list(commit.stats.files.keys())
})
return commits
except Exception as e:
raise GitError(f"Failed to get commit history: {str(e)}")
async def create_branch(self, name: str) -> None:
"""Create a new branch."""
try:
if not self.repo:
raise GitError("Git repository not initialized")
self.repo.create_head(name)
logger.info(f"Created branch: {name}")
except Exception as e:
raise GitError(f"Failed to create branch: {str(e)}")
async def checkout(self, branch: str) -> None:
"""Checkout a branch."""
try:
if not self.repo:
raise GitError("Git repository not initialized")
self.repo.git.checkout(branch)
logger.info(f"Checked out branch: {branch}")
except Exception as e:
raise GitError(f"Failed to checkout branch: {str(e)}")
async def get_diff(
self,
commit_a: Optional[str] = None,
commit_b: Optional[str] = None
) -> str:
"""Get diff between commits or working directory."""
try:
if not self.repo:
raise GitError("Git repository not initialized")
return self.repo.git.diff(commit_a, commit_b)
except Exception as e:
raise GitError(f"Failed to get diff: {str(e)}")
async def cleanup(self) -> None:
"""Clean up Git resources."""
self.repo = None
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/environments/manager.py:
--------------------------------------------------------------------------------
```python
"""Environment management for MCP Development Server."""
import os
import json
from typing import Dict, List, Optional, Any
from pathlib import Path
from ..docker.manager import DockerManager
from ..docker.volumes import VolumeManager
from ..docker.templates import DockerTemplates
from ..utils.logging import setup_logging
from ..utils.errors import EnvironmentError
logger = setup_logging(__name__)
class EnvironmentManager:
"""Manages development environments."""
def __init__(self):
self.docker_manager = DockerManager()
self.volume_manager = VolumeManager()
self.environments: Dict[str, Dict[str, Any]] = {}
async def create_environment(
self,
name: str,
project_path: str,
env_type: str,
config: Optional[Dict[str, Any]] = None
) -> str:
"""Create a new development environment."""
try:
config = config or {}
# Create environment directory
env_path = os.path.join(project_path, '.mcp', 'environments', name)
os.makedirs(env_path, exist_ok=True)
# Generate Dockerfile
dockerfile_content = DockerTemplates.get_template(env_type, config)
dockerfile_path = os.path.join(env_path, 'Dockerfile')
with open(dockerfile_path, 'w') as f:
f.write(dockerfile_content)
# Create volumes for persistence
volumes = {}
for volume_name in ['src', 'deps', 'cache']:
volume = await self.volume_manager.create_volume(
f"mcp-{name}-{volume_name}",
labels={
'mcp.environment': name,
'mcp.volume.type': volume_name
}
)
volumes[volume] = {'bind': f'/app/{volume_name}', 'mode': 'rw'}
# Create container
container_id = await self.docker_manager.create_container(
project_path=project_path,
environment=name,
dockerfile=dockerfile_path,
volumes=volumes,
environment_vars=config.get('env_vars'),
ports=config.get('ports')
)
# Store environment configuration
self.environments[name] = {
'id': container_id,
'type': env_type,
'path': env_path,
'config': config,
'volumes': volumes
}
# Save environment metadata
self._save_environment_metadata(name)
logger.info(f"Created environment: {name}")
return container_id
except Exception as e:
raise EnvironmentError(f"Failed to create environment: {str(e)}")
async def remove_environment(self, name: str) -> None:
"""Remove a development environment."""
try:
if env := self.environments.get(name):
# Stop container
await self.docker_manager.stop_container(name)
# Remove volumes
for volume in env['volumes']:
await self.volume_manager.remove_volume(volume)
# Remove environment directory
import shutil
shutil.rmtree(env['path'])
# Remove from environments dict
del self.environments[name]
logger.info(f"Removed environment: {name}")
else:
raise EnvironmentError(f"Environment not found: {name}")
except Exception as e:
raise EnvironmentError(f"Failed to remove environment: {str(e)}")
async def execute_in_environment(
self,
name: str,
command: str,
workdir: Optional[str] = None
) -> Dict[str, Any]:
"""Execute a command in an environment."""
try:
if name not in self.environments:
raise EnvironmentError(f"Environment not found: {name}")
return await self.docker_manager.execute_command(
environment=name,
command=command,
workdir=workdir
)
except Exception as e:
raise EnvironmentError(f"Failed to execute command: {str(e)}")
async def get_environment_status(self, name: str) -> Dict[str, Any]:
"""Get environment status including container and volumes."""
try:
if env := self.environments.get(name):
container_status = await self.docker_manager.get_container_status(name)
volumes_status = {}
for volume in env['volumes']:
volumes_status[volume] = await self.volume_manager.get_volume_info(volume)
return {
'container': container_status,
'volumes': volumes_status,
'type': env['type'],
'config': env['config']
}
else:
raise EnvironmentError(f"Environment not found: {name}")
except Exception as e:
raise EnvironmentError(f"Failed to get environment status: {str(e)}")
def _save_environment_metadata(self, name: str) -> None:
"""Save environment metadata to disk."""
if env := self.environments.get(name):
metadata_path = os.path.join(env['path'], 'metadata.json')
with open(metadata_path, 'w') as f:
json.dump({
'name': name,
'type': env['type'],
'config': env['config'],
'volumes': list(env['volumes'].keys())
}, f, indent=2)
async def cleanup(self) -> None:
"""Clean up all environments."""
for name in list(self.environments.keys()):
try:
await self.remove_environment(name)
except Exception as e:
logger.error(f"Error cleaning up environment {name}: {str(e)}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/project_templates.py:
--------------------------------------------------------------------------------
```python
"""Project-specific input templates."""
from typing import Dict
from .input_protocol import InputRequest, InputField
# Java Project Templates
JAVA_CONFIG = InputRequest(
request_id="java_config",
title="Java Project Configuration",
description="Configure Java project settings",
fields=[
InputField(
name="java_version",
type="select",
description="Java version",
options=[
{"value": "21", "label": "Java 21 (LTS)"},
{"value": "17", "label": "Java 17 (LTS)"},
{"value": "11", "label": "Java 11 (LTS)"},
{"value": "8", "label": "Java 8"}
]
),
InputField(
name="project_type",
type="select",
description="Project type",
options=[
{"value": "spring-boot", "label": "Spring Boot"},
{"value": "jakarta-ee", "label": "Jakarta EE"},
{"value": "android", "label": "Android"},
{"value": "library", "label": "Java Library"}
]
),
InputField(
name="packaging",
type="select",
description="Packaging type",
options=[
{"value": "jar", "label": "JAR"},
{"value": "war", "label": "WAR"},
{"value": "ear", "label": "EAR"}
]
)
]
)
# .NET Project Templates
DOTNET_CONFIG = InputRequest(
request_id="dotnet_config",
title=".NET Project Configuration",
description="Configure .NET project settings",
fields=[
InputField(
name="dotnet_version",
type="select",
description=".NET version",
options=[
{"value": "8.0", "label": ".NET 8.0"},
{"value": "7.0", "label": ".NET 7.0"},
{"value": "6.0", "label": ".NET 6.0 (LTS)"}
]
),
InputField(
name="project_type",
type="select",
description="Project type",
options=[
{"value": "webapi", "label": "ASP.NET Core Web API"},
{"value": "mvc", "label": "ASP.NET Core MVC"},
{"value": "blazor", "label": "Blazor"},
{"value": "maui", "label": ".NET MAUI"},
{"value": "library", "label": "Class Library"}
]
),
InputField(
name="authentication",
type="select",
description="Authentication type",
options=[
{"value": "none", "label": "None"},
{"value": "individual", "label": "Individual Accounts"},
{"value": "microsoft", "label": "Microsoft Identity Platform"},
{"value": "windows", "label": "Windows Authentication"}
]
)
]
)
# Node.js Project Templates
NODE_CONFIG = InputRequest(
request_id="node_config",
title="Node.js Project Configuration",
description="Configure Node.js project settings",
fields=[
InputField(
name="node_version",
type="select",
description="Node.js version",
options=[
{"value": "20", "label": "Node.js 20 (LTS)"},
{"value": "18", "label": "Node.js 18 (LTS)"}
]
),
InputField(
name="project_type",
type="select",
description="Project type",
options=[
{"value": "express", "label": "Express.js"},
{"value": "next", "label": "Next.js"},
{"value": "nest", "label": "NestJS"},
{"value": "library", "label": "NPM Package"}
]
),
InputField(
name="typescript",
type="confirm",
description="Use TypeScript?",
default=True
)
]
)
# Python Project Templates
PYTHON_CONFIG = InputRequest(
request_id="python_config",
title="Python Project Configuration",
description="Configure Python project settings",
fields=[
InputField(
name="python_version",
type="select",
description="Python version",
options=[
{"value": "3.12", "label": "Python 3.12"},
{"value": "3.11", "label": "Python 3.11"},
{"value": "3.10", "label": "Python 3.10"}
]
),
InputField(
name="project_type",
type="select",
description="Project type",
options=[
{"value": "fastapi", "label": "FastAPI"},
{"value": "django", "label": "Django"},
{"value": "flask", "label": "Flask"},
{"value": "library", "label": "Python Package"}
]
),
InputField(
name="dependency_management",
type="select",
description="Dependency management",
options=[
{"value": "poetry", "label": "Poetry"},
{"value": "pip", "label": "pip + requirements.txt"},
{"value": "pipenv", "label": "Pipenv"}
]
)
]
)
# Golang Project Templates
GOLANG_CONFIG = InputRequest(
request_id="golang_config",
title="Go Project Configuration",
description="Configure Go project settings",
fields=[
InputField(
name="go_version",
type="select",
description="Go version",
options=[
{"value": "1.22", "label": "Go 1.22"},
{"value": "1.21", "label": "Go 1.21"},
{"value": "1.20", "label": "Go 1.20"}
]
),
InputField(
name="project_type",
type="select",
description="Project type",
options=[
{"value": "gin", "label": "Gin Web Framework"},
{"value": "echo", "label": "Echo Framework"},
{"value": "cli", "label": "CLI Application"},
{"value": "library", "label": "Go Module"}
]
),
InputField(
name="module_path",
type="text",
description="Module path (e.g., github.com/user/repo)",
validation={"pattern": r"^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+(/[a-zA-Z0-9_.-]+)?$"}
)
]
)
# All project templates
PROJECT_TEMPLATES: Dict[str, InputRequest] = {
"java_config": JAVA_CONFIG,
"dotnet_config": DOTNET_CONFIG,
"node_config": NODE_CONFIG,
"python_config": PYTHON_CONFIG,
"golang_config": GOLANG_CONFIG
}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/templates.py:
--------------------------------------------------------------------------------
```python
"""Template system for project creation."""
import os
import shutil
from pathlib import Path
from typing import Dict, Any, List
import jinja2
import yaml
from ..utils.logging import setup_logging
from ..utils.errors import ProjectError
logger = setup_logging(__name__)
class TemplateManager:
"""Manages project templates."""
def __init__(self):
"""Initialize template manager."""
self.template_dir = self._get_template_dir()
self.env = jinja2.Environment(
loader=jinja2.FileSystemLoader(str(self.template_dir)),
autoescape=jinja2.select_autoescape()
)
def _get_template_dir(self) -> Path:
"""Get templates directory path."""
if os.name == "nt": # Windows
template_dir = Path(os.getenv("APPDATA")) / "Claude" / "templates"
else: # macOS/Linux
template_dir = Path.home() / ".config" / "claude" / "templates"
template_dir.mkdir(parents=True, exist_ok=True)
# Initialize with basic template if empty
if not any(template_dir.iterdir()):
self._initialize_basic_template(template_dir)
return template_dir
def _initialize_basic_template(self, template_dir: Path):
"""Initialize basic project template.
Args:
template_dir: Templates directory path
"""
basic_dir = template_dir / "basic"
basic_dir.mkdir(exist_ok=True)
# Create template configuration
config = {
"name": "basic",
"description": "Basic project template",
"version": "1.0.0",
"files": [
"README.md",
"requirements.txt",
".gitignore",
"src/__init__.py",
"tests/__init__.py"
],
"variables": {
"project_name": "",
"description": ""
},
"features": {
"git": True,
"tests": True,
"docker": False
}
}
with open(basic_dir / "template.yaml", "w") as f:
yaml.dump(config, f)
# Create template files
readme_content = """# {{ project_name }}
{{ description }}
## Installation
```bash
pip install -r requirements.txt
```
## Usage
```python
from {{ project_name.lower() }} import main
```
## Testing
```bash
pytest tests/
```
"""
with open(basic_dir / "README.md", "w") as f:
f.write(readme_content)
# Create source directory
src_dir = basic_dir / "src"
src_dir.mkdir(exist_ok=True)
with open(src_dir / "__init__.py", "w") as f:
f.write('"""{{ project_name }} package."""\n')
# Create tests directory
tests_dir = basic_dir / "tests"
tests_dir.mkdir(exist_ok=True)
with open(tests_dir / "__init__.py", "w") as f:
f.write('"""Tests for {{ project_name }}."""\n')
# Create requirements.txt
with open(basic_dir / "requirements.txt", "w") as f:
f.write("pytest>=7.0.0\n")
# Create .gitignore
gitignore_content = """__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
"""
with open(basic_dir / ".gitignore", "w") as f:
f.write(gitignore_content)
async def apply_template(self, template_name: str, project: Any) -> None:
"""Apply template to project.
Args:
template_name: Name of template to apply
project: Project instance
"""
try:
template_path = self.template_dir / template_name
if not template_path.exists():
raise ProjectError(f"Template not found: {template_name}")
# Load template configuration
with open(template_path / "template.yaml", "r") as f:
template_config = yaml.safe_load(f)
# Prepare template variables
variables = {
"project_name": project.config.name,
"description": project.config.description
}
# Process each template file
for file_path in template_config["files"]:
template_file = template_path / file_path
if template_file.exists():
# Create target directory if needed
target_path = Path(project.path) / file_path
target_path.parent.mkdir(parents=True, exist_ok=True)
# Render template content
template = self.env.get_template(f"{template_name}/{file_path}")
content = template.render(**variables)
# Write rendered content
with open(target_path, "w") as f:
f.write(content)
logger.info(f"Applied template {template_name} to project {project.config.name}")
except Exception as e:
logger.error(f"Failed to apply template: {str(e)}")
raise ProjectError(f"Template application failed: {str(e)}")
async def template_has_git(self, template_name: str) -> bool:
"""Check if template includes Git initialization.
Args:
template_name: Template name
Returns:
bool: True if template includes Git
"""
try:
template_path = self.template_dir / template_name
if not template_path.exists():
return False
# Load template configuration
with open(template_path / "template.yaml", "r") as f:
template_config = yaml.safe_load(f)
return template_config.get("features", {}).get("git", False)
except Exception:
return False
def list_templates(self) -> List[Dict[str, Any]]:
"""Get list of available templates.
Returns:
List[Dict[str, Any]]: Template information
"""
templates = []
for template_dir in self.template_dir.iterdir():
if template_dir.is_dir():
config_path = template_dir / "template.yaml"
if config_path.exists():
with open(config_path, "r") as f:
config = yaml.safe_load(f)
templates.append(config)
return templates
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/context.py:
--------------------------------------------------------------------------------
```python
"""Project context management for MCP Development Server."""
import os
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
from pydantic import BaseModel
from ..utils.config import ProjectConfig
from ..utils.logging import setup_logging
from ..utils.errors import ProjectError, FileOperationError
logger = setup_logging(__name__)
class ProjectState(BaseModel):
"""Project state tracking."""
initialized: bool = False
last_build_time: Optional[datetime] = None
last_build_status: Optional[str] = None
last_test_time: Optional[datetime] = None
last_test_status: Optional[str] = None
git_initialized: bool = False
class ProjectContext:
"""Manages the context and state of a development project."""
def __init__(self, config: ProjectConfig):
self.id = str(uuid.uuid4())
self.config = config
self.path = config.path
self.state = ProjectState()
self._file_watchers: Dict[str, Any] = {}
async def initialize(self) -> None:
"""Initialize project structure and state."""
try:
# Create project directory
os.makedirs(self.path, exist_ok=True)
# Create project structure
await self._create_project_structure()
# Initialize state file
await self._init_state_file()
# Set up file watchers
await self._setup_file_watchers()
self.state.initialized = True
logger.info(f"Initialized project {self.config.name} at {self.path}")
except Exception as e:
raise ProjectError(f"Project initialization failed: {str(e)}")
async def _create_project_structure(self) -> None:
"""Create initial project directory structure."""
try:
# Create standard directories
for dir_name in ['.mcp', 'src', 'tests', 'docs']:
os.makedirs(os.path.join(self.path, dir_name), exist_ok=True)
# Create basic configuration files
config_path = os.path.join(self.path, '.mcp', 'project.json')
with open(config_path, 'w') as f:
json.dump(self.config.dict(), f, indent=2, default=str)
except Exception as e:
raise FileOperationError(f"Failed to create project structure: {str(e)}")
async def _init_state_file(self) -> None:
"""Initialize project state file."""
try:
state_path = os.path.join(self.path, '.mcp', 'state.json')
with open(state_path, 'w') as f:
json.dump(self.state.dict(), f, indent=2, default=str)
except Exception as e:
raise FileOperationError(f"Failed to initialize state file: {str(e)}")
async def _setup_file_watchers(self) -> None:
"""Set up file system watchers for project directories."""
# To be implemented with file watching functionality
pass
def get_structure(self) -> Dict[str, Any]:
"""Get project structure as a dictionary."""
structure = {"name": self.config.name, "type": "directory", "children": []}
def scan_directory(path: Path, current_dict: Dict[str, Any]) -> None:
try:
for item in path.iterdir():
# Skip hidden files and .mcp directory
if item.name.startswith('.'):
continue
if item.is_file():
current_dict["children"].append({
"name": item.name,
"type": "file",
"size": item.stat().st_size
})
elif item.is_dir():
dir_dict = {
"name": item.name,
"type": "directory",
"children": []
}
current_dict["children"].append(dir_dict)
scan_directory(item, dir_dict)
except Exception as e:
logger.error(f"Error scanning directory {path}: {str(e)}")
scan_directory(Path(self.path), structure)
return structure
def get_file_content(self, relative_path: str) -> str:
"""Get content of a project file."""
try:
file_path = os.path.join(self.path, relative_path)
if not os.path.exists(file_path):
raise FileOperationError(f"File not found: {relative_path}")
# Basic security check
if not os.path.normpath(file_path).startswith(str(self.path)):
raise FileOperationError("Invalid file path")
with open(file_path, 'r') as f:
return f.read()
except Exception as e:
raise FileOperationError(f"Failed to read file {relative_path}: {str(e)}")
async def update_file(self, relative_path: str, content: str) -> None:
"""Update content of a project file."""
try:
file_path = os.path.join(self.path, relative_path)
# Create directories if needed
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Security check
if not os.path.normpath(file_path).startswith(str(self.path)):
raise FileOperationError("Invalid file path")
with open(file_path, 'w') as f:
f.write(content)
logger.info(f"Updated file: {relative_path}")
except Exception as e:
raise FileOperationError(f"Failed to update file {relative_path}: {str(e)}")
async def delete_file(self, relative_path: str) -> None:
"""Delete a project file."""
try:
file_path = os.path.join(self.path, relative_path)
# Security check
if not os.path.normpath(file_path).startswith(str(self.path)):
raise FileOperationError("Invalid file path")
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted file: {relative_path}")
else:
logger.warning(f"File not found: {relative_path}")
except Exception as e:
raise FileOperationError(f"Failed to delete file {relative_path}: {str(e)}")
async def update_state(self, **kwargs) -> None:
"""Update project state."""
try:
# Update state object
for key, value in kwargs.items():
if hasattr(self.state, key):
setattr(self.state, key, value)
# Save to state file
state_path = os.path.join(self.path, '.mcp', 'state.json')
with open(state_path, 'w') as f:
json.dump(self.state.dict(), f, indent=2, default=str)
logger.info(f"Updated project state: {kwargs}")
except Exception as e:
raise ProjectError(f"Failed to update project state: {str(e)}")
async def cleanup(self) -> None:
"""Clean up project resources."""
try:
# Stop file watchers
for watcher in self._file_watchers.values():
await watcher.stop()
logger.info(f"Cleaned up project resources for {self.config.name}")
except Exception as e:
logger.error(f"Error during project cleanup: {str(e)}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/manager.py:
--------------------------------------------------------------------------------
```python
"""Docker integration for MCP Development Server."""
import asyncio
import docker
from typing import Dict, Any, Optional, List
from pathlib import Path
import tempfile
import yaml
import jinja2
from ..utils.logging import setup_logging
from ..utils.errors import MCPDevServerError
logger = setup_logging(__name__)
class DockerManager:
"""Manages Docker containers and environments."""
def __init__(self):
"""Initialize Docker manager."""
self.client = docker.from_env()
self.active_containers: Dict[str, Any] = {}
self._setup_template_environment()
def _setup_template_environment(self):
"""Set up Jinja2 template environment."""
template_dir = Path(__file__).parent / "templates"
self.template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(str(template_dir)),
autoescape=jinja2.select_autoescape()
)
async def create_environment(
self,
name: str,
image: str,
project_path: str,
env_vars: Optional[Dict[str, str]] = None,
ports: Optional[Dict[str, str]] = None,
volumes: Optional[Dict[str, Dict[str, str]]] = None
) -> str:
"""Create a new Docker environment.
Args:
name: Environment name
image: Docker image name
project_path: Project directory path
env_vars: Environment variables
ports: Port mappings
volumes: Additional volume mappings
Returns:
str: Environment ID
"""
try:
# Ensure image is available
try:
self.client.images.get(image)
except docker.errors.ImageNotFound:
logger.info(f"Pulling image: {image}")
self.client.images.pull(image)
# Setup default volumes
container_volumes = {
project_path: {
"bind": "/workspace",
"mode": "rw"
}
}
if volumes:
container_volumes.update(volumes)
# Create container
container = self.client.containers.run(
image=image,
name=f"mcp-env-{name}",
detach=True,
volumes=container_volumes,
environment=env_vars or {},
ports=ports or {},
working_dir="/workspace",
remove=True
)
env_id = container.id
self.active_containers[env_id] = {
"name": name,
"container": container,
"status": "running"
}
logger.info(f"Created environment: {name} ({env_id})")
return env_id
except Exception as e:
logger.error(f"Failed to create environment: {str(e)}")
raise MCPDevServerError(f"Environment creation failed: {str(e)}")
async def generate_dockerfile(
self,
template: str,
variables: Dict[str, Any],
output_path: Optional[str] = None
) -> str:
"""Generate Dockerfile from template.
Args:
template: Template name
variables: Template variables
output_path: Optional path to save Dockerfile
Returns:
str: Generated Dockerfile content
"""
try:
template = self.template_env.get_template(f"{template}.dockerfile")
content = template.render(**variables)
if output_path:
with open(output_path, "w") as f:
f.write(content)
return content
except Exception as e:
logger.error(f"Failed to generate Dockerfile: {str(e)}")
raise MCPDevServerError(f"Dockerfile generation failed: {str(e)}")
async def create_compose_config(
self,
name: str,
services: Dict[str, Any],
output_path: Optional[str] = None
) -> str:
"""Create Docker Compose configuration.
Args:
name: Project name
services: Service configurations
output_path: Optional path to save docker-compose.yml
Returns:
str: Generated docker-compose.yml content
"""
try:
compose_config = {
"version": "3.8",
"services": services,
"networks": {
"mcp-network": {
"driver": "bridge"
}
}
}
content = yaml.dump(compose_config, default_flow_style=False)
if output_path:
with open(output_path, "w") as f:
f.write(content)
return content
except Exception as e:
logger.error(f"Failed to create Docker Compose config: {str(e)}")
raise MCPDevServerError(f"Compose config creation failed: {str(e)}")
async def execute_command(
self,
env_id: str,
command: str,
workdir: Optional[str] = None,
stream: bool = False
) -> Dict[str, Any]:
"""Execute command in Docker environment.
Args:
env_id: Environment ID
command: Command to execute
workdir: Working directory
stream: Stream output in real-time
Returns:
Dict[str, Any]: Command execution results
"""
try:
if env_id not in self.active_containers:
raise MCPDevServerError(f"Environment not found: {env_id}")
container = self.active_containers[env_id]["container"]
exec_result = container.exec_run(
command,
workdir=workdir or "/workspace",
stream=True
)
if stream:
output = []
for line in exec_result.output:
decoded_line = line.decode().strip()
output.append(decoded_line)
yield decoded_line
return {
"exit_code": exec_result.exit_code,
"output": output
}
else:
output = []
for line in exec_result.output:
output.append(line.decode().strip())
return {
"exit_code": exec_result.exit_code,
"output": output
}
except Exception as e:
logger.error(f"Command execution failed: {str(e)}")
raise MCPDevServerError(f"Command execution failed: {str(e)}")
async def cleanup(self):
"""Clean up Docker resources."""
try:
for env_id in list(self.active_containers.keys()):
await self.destroy_environment(env_id)
except Exception as e:
logger.error(f"Docker cleanup failed: {str(e)}")
raise MCPDevServerError(f"Docker cleanup failed: {str(e)}")
def get_logs(self, env_id: str, tail: Optional[int] = None) -> str:
"""Get container logs.
Args:
env_id: Environment ID
tail: Number of lines to return from the end
Returns:
str: Container logs
"""
try:
if env_id not in self.active_containers:
raise MCPDevServerError(f"Environment not found: {env_id}")
container = self.active_containers[env_id]["container"]
return container.logs(tail=tail).decode()
except Exception as e:
logger.error(f"Failed to get logs: {str(e)}")
raise MCPDevServerError(f"Log retrieval failed: {str(e)}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/project.py:
--------------------------------------------------------------------------------
```python
"""Project representation and management."""
import uuid
from typing import Dict, Any, Optional, List
from pathlib import Path
import git
from pydantic import BaseModel
class ProjectConfig(BaseModel):
"""Project configuration model."""
name: str
template: str
description: str = ""
version: str = "0.1.0"
class ProjectState:
"""Project state tracking."""
def __init__(self):
"""Initialize project state."""
self.git_initialized: bool = False
self.last_build: Optional[Dict[str, Any]] = None
self.last_test_run: Optional[Dict[str, Any]] = None
self.active_environments: List[str] = []
class Project:
"""Project instance representation."""
def __init__(self, path: str, config: ProjectConfig, state: ProjectState):
"""Initialize project instance.
Args:
path: Project directory path
config: Project configuration
state: Project state
"""
self.id = str(uuid.uuid4())
self.path = path
self.config = config
self.state = state
def get_structure(self) -> Dict[str, Any]:
"""Get project directory structure.
Returns:
Dict[str, Any]: Directory structure
"""
def scan_dir(path: Path) -> Dict[str, Any]:
structure = {}
for item in path.iterdir():
if item.name.startswith("."):
continue
if item.is_file():
structure[item.name] = "file"
elif item.is_dir():
structure[item.name] = scan_dir(item)
return structure
return scan_dir(Path(self.path))
def get_git_status(self) -> Dict[str, Any]:
"""Get Git repository status.
Returns:
Dict[str, Any]: Git status information
"""
if not self.state.git_initialized:
return {"initialized": False}
try:
repo = git.Repo(self.path)
return {
"initialized": True,
"branch": repo.active_branch.name,
"changed_files": [item.a_path for item in repo.index.diff(None)],
"untracked_files": repo.untracked_files,
"ahead": sum(1 for c in repo.iter_commits("origin/main..main")),
"behind": sum(1 for c in repo.iter_commits("main..origin/main"))
}
except Exception as e:
return {
"initialized": False,
"error": str(e)
}
async def create_git_commit(self, message: str, files: Optional[List[str]] = None) -> Dict[str, Any]:
"""Create a Git commit.
Args:
message: Commit message
files: Optional list of files to commit
Returns:
Dict[str, Any]: Commit information
"""
if not self.state.git_initialized:
raise ValueError("Git is not initialized for this project")
try:
repo = git.Repo(self.path)
if files:
repo.index.add(files)
else:
repo.index.add("*")
commit = repo.index.commit(message)
return {
"commit_id": commit.hexsha,
"message": message,
"author": str(commit.author),
"files": [item.a_path for item in commit.stats.files]
}
except Exception as e:
raise ValueError(f"Failed to create commit: {str(e)}")
def get_dependencies(self) -> Dict[str, Any]:
"""Get project dependencies.
Returns:
Dict[str, Any]: Dependency information
"""
dependencies = {}
# Check Python dependencies
req_file = Path(self.path) / "requirements.txt"
if req_file.exists():
with open(req_file, "r") as f:
dependencies["python"] = f.read().splitlines()
# Check Node.js dependencies
package_file = Path(self.path) / "package.json"
if package_file.exists():
import json
with open(package_file, "r") as f:
package_data = json.load(f)
dependencies["node"] = {
"dependencies": package_data.get("dependencies", {}),
"devDependencies": package_data.get("devDependencies", {})
}
return dependencies
def analyze_code(self) -> Dict[str, Any]:
"""Analyze project code.
Returns:
Dict[str, Any]: Code analysis results
"""
analysis = {
"files": {},
"summary": {
"total_files": 0,
"total_lines": 0,
"code_lines": 0,
"comment_lines": 0,
"blank_lines": 0
}
}
def analyze_file(path: Path) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
total_lines = len(lines)
blank_lines = sum(1 for line in lines if not line.strip())
comment_lines = sum(1 for line in lines if line.strip().startswith("#"))
code_lines = total_lines - blank_lines - comment_lines
return {
"total_lines": total_lines,
"code_lines": code_lines,
"comment_lines": comment_lines,
"blank_lines": blank_lines
}
for root, _, files in os.walk(self.path):
for file in files:
if file.endswith(".py"):
file_path = Path(root) / file
try:
file_analysis = analyze_file(file_path)
relative_path = str(file_path.relative_to(self.path))
analysis["files"][relative_path] = file_analysis
# Update summary
for key in ["total_lines", "code_lines", "comment_lines", "blank_lines"]:
analysis["summary"][key] += file_analysis[key]
analysis["summary"]["total_files"] += 1
except Exception:
continue
return analysis
def get_test_coverage(self) -> Dict[str, Any]:
"""Get test coverage information.
Returns:
Dict[str, Any]: Test coverage data
"""
try:
import coverage
cov = coverage.Coverage()
cov.load()
return {
"total_coverage": cov.report(),
"missing_lines": dict(cov.analysis2()),
"branch_coverage": cov.get_option("branch"),
"excluded_lines": cov.get_exclude_list()
}
except Exception:
return {
"error": "Coverage data not available"
}
def get_ci_config(self) -> Dict[str, Any]:
"""Get CI configuration.
Returns:
Dict[str, Any]: CI configuration data
"""
ci_configs = {}
# Check GitHub Actions
github_dir = Path(self.path) / ".github" / "workflows"
if github_dir.exists():
ci_configs["github_actions"] = []
for workflow in github_dir.glob("*.yml"):
with open(workflow, "r") as f:
ci_configs["github_actions"].append({
"name": workflow.stem,
"config": f.read()
})
# Check GitLab CI
gitlab_file = Path(self.path) / ".gitlab-ci.yml"
if gitlab_file.exists():
with open(gitlab_file, "r") as f:
ci_configs["gitlab"] = f.read()
return ci_configs
async def cleanup(self):
"""Clean up project resources."""
# Implementation will depend on what resources need cleanup
pass
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/core/server.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import websockets
from typing import Callable, Any, Dict, Optional
import logging
import traceback
logger = logging.getLogger(__name__)
class Server:
"""Core server class implementing JSON-RPC 2.0 protocol."""
def __init__(self, name: str):
"""Initialize the server.
Args:
name: Server name
"""
self.name = name
self.websocket = None
self.input_request_handlers = {}
self.input_response_handlers = {}
self.initialized = False
self.capabilities = {}
async def start(self, host: str = "localhost", port: int = 8000):
"""Start the WebSocket server.
Args:
host: Host to bind to
port: Port to listen on
"""
async def handler(websocket, path):
self.websocket = websocket
try:
logger.info(f"New WebSocket connection from {websocket.remote_address}")
async for message in websocket:
response = None
try:
# Parse JSON-RPC message
data = json.loads(message)
if not isinstance(data, dict):
raise ValueError("Invalid JSON-RPC message")
# Handle message
response = await self.handle_jsonrpc(data)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}")
response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error",
"data": str(e)
},
"id": None
}
except Exception as e:
logger.error(f"Error handling message: {str(e)}", exc_info=True)
response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": "Internal error",
"data": {
"error": str(e),
"traceback": traceback.format_exc()
}
},
"id": getattr(data, "id", None) if isinstance(data, dict) else None
}
# Ensure we always send a properly formatted JSON-RPC response
if response:
try:
if not isinstance(response, dict):
response = {"result": response}
response["jsonrpc"] = "2.0"
if isinstance(data, dict) and "id" in data:
response["id"] = data["id"]
# Validate JSON before sending
response_str = json.dumps(response)
await websocket.send(response_str)
except Exception as e:
logger.error(f"Error sending response: {str(e)}", exc_info=True)
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": "Error sending response",
"data": str(e)
},
"id": data.get("id") if isinstance(data, dict) else None
}
await websocket.send(json.dumps(error_response))
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket connection closed")
finally:
self.websocket = None
try:
self.server = await websockets.serve(
handler,
host,
port,
ping_interval=20,
ping_timeout=20
)
logger.info(f"Server started on ws://{host}:{port}")
except Exception as e:
logger.error(f"Failed to start server: {str(e)}", exc_info=True)
raise
async def handle_jsonrpc(self, data: Dict) -> Optional[Dict]:
"""Handle JSON-RPC message.
Args:
data: Parsed JSON-RPC message
Returns:
Optional response message
"""
try:
method = data.get("method")
params = data.get("params", {})
logger.info(f"Handling method: {method} with params: {params}")
if method == "initialize":
self.capabilities = params.get("capabilities", {})
self.initialized = True
return {
"result": {
"capabilities": self.capabilities
}
}
if not self.initialized:
return {
"error": {
"code": -32002,
"message": "Server not initialized"
}
}
if method == "input/request":
handler = self.input_request_handlers.get("input_request")
if handler:
try:
result = await handler(
params.get("type", ""),
params.get("context", {})
)
return {"result": result}
except Exception as e:
logger.error(f"Error in input request handler: {str(e)}", exc_info=True)
return {
"error": {
"code": -32000,
"message": str(e),
"data": {
"traceback": traceback.format_exc()
}
}
}
elif method == "input/response":
handler = self.input_response_handlers.get("input_response")
if handler:
try:
await handler(params)
return {"result": None}
except Exception as e:
logger.error(f"Error in input response handler: {str(e)}", exc_info=True)
return {
"error": {
"code": -32000,
"message": str(e),
"data": {
"traceback": traceback.format_exc()
}
}
}
return {
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
except Exception as e:
logger.error(f"Error in handle_jsonrpc: {str(e)}", exc_info=True)
return {
"error": {
"code": -32603,
"message": "Internal error",
"data": {
"error": str(e),
"traceback": traceback.format_exc()
}
}
}
def request_input(self) -> Callable:
"""Decorator for input request handlers."""
def decorator(func: Callable) -> Callable:
self.input_request_handlers["input_request"] = func
return func
return decorator
def handle_input(self) -> Callable:
"""Decorator for input response handlers."""
def decorator(func: Callable) -> Callable:
self.input_response_handlers["input_response"] = func
return func
return decorator
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/base_project.py:
--------------------------------------------------------------------------------
```python
"""Base project class definition."""
import os
import uuid
import xml.etree.ElementTree as ET
import json
import tomli
from pathlib import Path
from typing import Dict, Any, Optional, List
import git
from .project_types import ProjectType, BuildSystem
from ..utils.errors import ProjectError
from ..utils.logging import setup_logging
logger = setup_logging(__name__)
class Project:
"""Base project class."""
def __init__(self, path: str, config: Dict[str, Any], project_type: ProjectType):
"""Initialize project instance."""
self.id = str(uuid.uuid4())
self.path = path
self.config = config
self.project_type = project_type
self.build_system = BuildSystem(config["build_system"])
def get_dependencies(self) -> Dict[str, Any]:
"""Get project dependencies."""
if self.build_system == BuildSystem.MAVEN:
return self._get_maven_dependencies()
elif self.build_system == BuildSystem.GRADLE:
return self._get_gradle_dependencies()
elif self.build_system in [BuildSystem.NPM, BuildSystem.YARN]:
return self._get_node_dependencies()
elif self.build_system == BuildSystem.POETRY:
return self._get_poetry_dependencies()
elif self.build_system == BuildSystem.DOTNET:
return self._get_dotnet_dependencies()
elif self.build_system == BuildSystem.GO:
return self._get_go_dependencies()
else:
return {}
def _get_maven_dependencies(self) -> Dict[str, Any]:
"""Get Maven project dependencies."""
pom_path = Path(self.path) / "pom.xml"
if not pom_path.exists():
return {}
try:
tree = ET.parse(pom_path)
root = tree.getroot()
ns = {'maven': 'http://maven.apache.org/POM/4.0.0'}
dependencies = []
for dep in root.findall('.//maven:dependency', ns):
dependencies.append({
'groupId': dep.find('maven:groupId', ns).text,
'artifactId': dep.find('maven:artifactId', ns).text,
'version': dep.find('maven:version', ns).text if dep.find('maven:version', ns) is not None else None,
'scope': dep.find('maven:scope', ns).text if dep.find('maven:scope', ns) is not None else 'compile'
})
return {'maven': dependencies}
except Exception as e:
logger.error(f"Error parsing Maven dependencies: {e}")
return {}
def _get_node_dependencies(self) -> Dict[str, Any]:
"""Get Node.js project dependencies."""
package_path = Path(self.path) / "package.json"
if not package_path.exists():
return {}
try:
with open(package_path) as f:
package_data = json.load(f)
return {
'dependencies': package_data.get('dependencies', {}),
'devDependencies': package_data.get('devDependencies', {})
}
except Exception as e:
logger.error(f"Error parsing Node.js dependencies: {e}")
return {}
def _get_poetry_dependencies(self) -> Dict[str, Any]:
"""Get Poetry project dependencies."""
pyproject_path = Path(self.path) / "pyproject.toml"
if not pyproject_path.exists():
return {}
try:
with open(pyproject_path, "rb") as f:
pyproject_data = tomli.load(f)
tool_poetry = pyproject_data.get('tool', {}).get('poetry', {})
return {
'dependencies': tool_poetry.get('dependencies', {}),
'dev-dependencies': tool_poetry.get('dev-dependencies', {})
}
except Exception as e:
logger.error(f"Error parsing Poetry dependencies: {e}")
return {}
def _get_dotnet_dependencies(self) -> Dict[str, Any]:
"""Get .NET project dependencies."""
try:
# Find all .csproj files
csproj_files = list(Path(self.path).glob("**/*.csproj"))
dependencies = {}
for csproj in csproj_files:
tree = ET.parse(csproj)
root = tree.getroot()
project_deps = []
for item_group in root.findall('.//PackageReference'):
project_deps.append({
'Include': item_group.get('Include'),
'Version': item_group.get('Version')
})
dependencies[csproj.stem] = project_deps
return dependencies
except Exception as e:
logger.error(f"Error parsing .NET dependencies: {e}")
return {}
def _get_go_dependencies(self) -> Dict[str, Any]:
"""Get Go project dependencies."""
go_mod_path = Path(self.path) / "go.mod"
if not go_mod_path.exists():
return {}
try:
result = subprocess.run(
['go', 'list', '-m', 'all'],
capture_output=True,
text=True,
cwd=self.path
)
if result.returncode == 0:
dependencies = []
for line in result.stdout.splitlines()[1:]: # Skip first line (module name)
parts = line.split()
if len(parts) >= 2:
dependencies.append({
'module': parts[0],
'version': parts[1]
})
return {'modules': dependencies}
except Exception as e:
logger.error(f"Error parsing Go dependencies: {e}")
return {}
async def update_dependencies(self, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Update project dependencies."""
if self.build_system == BuildSystem.MAVEN:
cmd = "mvn versions:use-latest-versions"
elif self.build_system == BuildSystem.GRADLE:
cmd = "./gradlew dependencyUpdates"
elif self.build_system == BuildSystem.NPM:
cmd = "npm update"
elif self.build_system == BuildSystem.YARN:
cmd = "yarn upgrade"
elif self.build_system == BuildSystem.POETRY:
cmd = "poetry update"
elif self.build_system == BuildSystem.DOTNET:
cmd = "dotnet restore"
else:
raise ProjectError(f"Dependency updates not supported for {self.build_system}")
return await self.execute_command(cmd)
async def get_project_analysis(self) -> Dict[str, Any]:
"""Get project analysis results."""
analysis = {
"structure": self.get_structure(),
"dependencies": self.get_dependencies(),
"metadata": {
"name": self.config["name"],
"type": self.project_type.name,
"build_system": self.build_system.value,
"config": self.config
}
}
# Add Git information if available
git_info = self.get_git_status()
if git_info.get("initialized", False):
analysis["git"] = git_info
# Add build/test status if available
if hasattr(self, 'last_build'):
analysis["last_build"] = self.last_build
if hasattr(self, 'last_test_run'):
analysis["last_test_run"] = self.last_test_run
return analysis
def get_structure(self) -> Dict[str, Any]:
"""Get project structure."""
def scan_dir(path: Path) -> Dict[str, Any]:
structure = {}
ignore_patterns = ['.git', '__pycache__', 'node_modules', 'target', 'build']
for item in path.iterdir():
if item.name in ignore_patterns:
continue
if item.is_file():
structure[item.name] = {
"type": "file",
"size": item.stat().st_size
}
elif item.is_dir():
structure[item.name] = {
"type": "directory",
"contents": scan_dir(item)
}
return structure
return scan_dir(Path(self.path))
async def cleanup(self):
"""Clean up project resources."""
try:
# Clean build artifacts
if self.build_system == BuildSystem.MAVEN:
await self.execute_command("mvn clean")
elif self.build_system == BuildSystem.GRADLE:
await self.execute_command("./gradlew clean")
elif self.build_system == BuildSystem.NPM:
await self.execute_command("npm run clean")
logger.info(f"Cleaned up project: {self.config['name']}")
except Exception as e:
logger.error(f"Project cleanup failed: {e}")
raise ProjectError(f"Cleanup failed: {str(e)}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/environments/workflow.py:
--------------------------------------------------------------------------------
```python
"""Development workflow management for environments."""
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import asyncio
from ..utils.logging import setup_logging
from ..utils.errors import WorkflowError
logger = setup_logging(__name__)
class TaskStatus(str, Enum):
"""Workflow task status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class Task:
"""Represents a workflow task."""
def __init__(
self,
name: str,
command: str,
environment: str,
dependencies: Optional[List[str]] = None,
timeout: Optional[int] = None,
retry_count: int = 0,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None
):
self.name = name
self.command = command
self.environment = environment
self.dependencies = dependencies or []
self.timeout = timeout
self.retry_count = retry_count
self.status = TaskStatus.PENDING
self.result: Optional[Dict[str, Any]] = None
self.on_success = on_success
self.on_failure = on_failure
self.attempts = 0
class Workflow:
"""Manages development workflows."""
def __init__(self, env_manager):
self.env_manager = env_manager
self.tasks: Dict[str, Task] = {}
self.running = False
def add_task(self, task: Task) -> None:
"""Add a task to the workflow."""
self.tasks[task.name] = task
def remove_task(self, task_name: str) -> None:
"""Remove a task from the workflow."""
if task_name in self.tasks:
del self.tasks[task_name]
async def execute(self) -> Dict[str, Any]:
"""Execute the workflow."""
try:
self.running = True
results = {}
# Build dependency graph
graph = self._build_dependency_graph()
# Execute tasks in order
for task_group in graph:
# Execute tasks in group concurrently
tasks = [self._execute_task(task_name) for task_name in task_group]
group_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for task_name, result in zip(task_group, group_results):
if isinstance(result, Exception):
self.tasks[task_name].status = TaskStatus.FAILED
results[task_name] = {
"status": TaskStatus.FAILED,
"error": str(result)
}
else:
results[task_name] = result
return results
except Exception as e:
raise WorkflowError(f"Workflow execution failed: {str(e)}")
finally:
self.running = False
async def _execute_task(self, task_name: str) -> Dict[str, Any]:
"""Execute a single task."""
task = self.tasks[task_name]
# Check dependencies
for dep in task.dependencies:
dep_task = self.tasks.get(dep)
if not dep_task or dep_task.status != TaskStatus.COMPLETED:
task.status = TaskStatus.SKIPPED
return {
"status": TaskStatus.SKIPPED,
"reason": f"Dependency {dep} not satisfied"
}
task.status = TaskStatus.RUNNING
task.attempts += 1
try:
# Execute the command
result = await asyncio.wait_for(
self.env_manager.execute_in_environment(
task.environment,
task.command
),
timeout=task.timeout
)
# Handle execution result
if result['exit_code'] == 0:
task.status = TaskStatus.COMPLETED
if task.on_success:
await task.on_success(result)
return {
"status": TaskStatus.COMPLETED,
"result": result
}
else:
# Handle retry logic
if task.attempts < task.retry_count + 1:
logger.info(f"Retrying task {task_name} (attempt {task.attempts})")
return await self._execute_task(task_name)
task.status = TaskStatus.FAILED
if task.on_failure:
await task.on_failure(result)
return {
"status": TaskStatus.FAILED,
"result": result
}
except asyncio.TimeoutError:
task.status = TaskStatus.FAILED
return {
"status": TaskStatus.FAILED,
"error": "Task timeout"
}
except Exception as e:
task.status = TaskStatus.FAILED
return {
"status": TaskStatus.FAILED,
"error": str(e)
}
def _build_dependency_graph(self) -> List[List[str]]:
"""Build ordered list of task groups based on dependencies."""
# Initialize variables
graph: List[List[str]] = []
completed = set()
remaining = set(self.tasks.keys())
while remaining:
# Find tasks with satisfied dependencies
group = set()
for task_name in remaining:
task = self.tasks[task_name]
if all(dep in completed for dep in task.dependencies):
group.add(task_name)
if not group:
# Circular dependency detected
raise WorkflowError("Circular dependency detected in workflow")
# Add group to graph
graph.append(list(group))
completed.update(group)
remaining.difference_update(group)
return graph
def get_status(self) -> Dict[str, Any]:
"""Get workflow status."""
return {
"running": self.running,
"tasks": {
name: {
"status": task.status,
"attempts": task.attempts,
"dependencies": task.dependencies
}
for name, task in self.tasks.items()
}
}
def reset(self) -> None:
"""Reset workflow state."""
for task in self.tasks.values():
task.status = TaskStatus.PENDING
task.attempts = 0
task.result = None
self.running = False
# Example workflow definitions for common development tasks
class CommonWorkflows:
"""Predefined development workflows."""
@staticmethod
def create_build_workflow(env_manager, environment: str) -> Workflow:
"""Create a standard build workflow."""
workflow = Workflow(env_manager)
# Install dependencies
workflow.add_task(Task(
name="install_deps",
command="npm install",
environment=environment,
retry_count=2
))
# Run linter
workflow.add_task(Task(
name="lint",
command="npm run lint",
environment=environment,
dependencies=["install_deps"]
))
# Run tests
workflow.add_task(Task(
name="test",
command="npm run test",
environment=environment,
dependencies=["install_deps"]
))
# Build
workflow.add_task(Task(
name="build",
command="npm run build",
environment=environment,
dependencies=["lint", "test"]
))
return workflow
@staticmethod
def create_test_workflow(env_manager, environment: str) -> Workflow:
"""Create a standard test workflow."""
workflow = Workflow(env_manager)
# Install test dependencies
workflow.add_task(Task(
name="install_test_deps",
command="npm install --only=dev",
environment=environment,
retry_count=2
))
# Run unit tests
workflow.add_task(Task(
name="unit_tests",
command="npm run test:unit",
environment=environment,
dependencies=["install_test_deps"]
))
# Run integration tests
workflow.add_task(Task(
name="integration_tests",
command="npm run test:integration",
environment=environment,
dependencies=["install_test_deps"]
))
# Generate coverage report
workflow.add_task(Task(
name="coverage",
command="npm run coverage",
environment=environment,
dependencies=["unit_tests", "integration_tests"]
))
return workflow
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/environments/tools.py:
--------------------------------------------------------------------------------
```python
"""Development tools integration for environments."""
import shutil
import subprocess
from typing import Dict, Optional, Any
from pathlib import Path
from ..utils.logging import setup_logging
from ..utils.errors import ToolError
logger = setup_logging(__name__)
class ToolManager:
"""Manages development tools in environments."""
def __init__(self, env_manager):
self.env_manager = env_manager
async def setup_package_manager(
self,
environment: str,
package_manager: str,
config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Set up package manager in an environment."""
try:
config = config or {}
if package_manager == "npm":
return await self._setup_npm(environment, config)
elif package_manager == "pip":
return await self._setup_pip(environment, config)
else:
raise ToolError(f"Unsupported package manager: {package_manager}")
except Exception as e:
raise ToolError(f"Failed to setup package manager: {str(e)}")
async def setup_build_tool(
self,
environment: str,
build_tool: str,
config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Set up build tool in an environment."""
try:
config = config or {}
if build_tool == "webpack":
return await self._setup_webpack(environment, config)
elif build_tool == "vite":
return await self._setup_vite(environment, config)
else:
raise ToolError(f"Unsupported build tool: {build_tool}")
except Exception as e:
raise ToolError(f"Failed to setup build tool: {str(e)}")
async def setup_test_framework(
self,
environment: str,
test_framework: str,
config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Set up testing framework in an environment."""
try:
config = config or {}
if test_framework == "jest":
return await self._setup_jest(environment, config)
elif test_framework == "pytest":
return await self._setup_pytest(environment, config)
else:
raise ToolError(f"Unsupported test framework: {test_framework}")
except Exception as e:
raise ToolError(f"Failed to setup test framework: {str(e)}")
async def _setup_npm(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up NPM package manager."""
try:
# Initialize package.json if needed
if not config.get('skip_init'):
result = await self.env_manager.execute_in_environment(
environment,
'npm init -y'
)
if result['exit_code'] != 0:
raise ToolError(f"npm init failed: {result['error']}")
# Install dependencies if specified
if deps := config.get('dependencies'):
deps_str = ' '.join(deps)
result = await self.env_manager.execute_in_environment(
environment,
f'npm install {deps_str}'
)
if result['exit_code'] != 0:
raise ToolError(f"npm install failed: {result['error']}")
return {"status": "success"}
except Exception as e:
raise ToolError(f"NPM setup failed: {str(e)}")
async def _setup_pip(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up Pip package manager."""
try:
# Create virtual environment if needed
if not config.get('skip_venv'):
result = await self.env_manager.execute_in_environment(
environment,
'python -m venv .venv'
)
if result['exit_code'] != 0:
raise ToolError(f"venv creation failed: {result['error']}")
# Install dependencies if specified
if deps := config.get('dependencies'):
deps_str = ' '.join(deps)
result = await self.env_manager.execute_in_environment(
environment,
f'pip install {deps_str}'
)
if result['exit_code'] != 0:
raise ToolError(f"pip install failed: {result['error']}")
return {"status": "success"}
except Exception as e:
raise ToolError(f"Pip setup failed: {str(e)}")
async def _setup_webpack(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up Webpack build tool."""
try:
# Install webpack and dependencies
result = await self.env_manager.execute_in_environment(
environment,
'npm install webpack webpack-cli --save-dev'
)
if result['exit_code'] != 0:
raise ToolError(f"webpack installation failed: {result['error']}")
# Create webpack config if not exists
config_content = """
const path = require('path');
module.exports = {
entry: './src/index.js',
output: {
path: path.resolve(__dirname, 'dist'),
filename: 'bundle.js'
}
};
"""
config_path = Path(self.env_manager.environments[environment]['path']) / 'webpack.config.js'
config_path.write_text(config_content)
return {"status": "success"}
except Exception as e:
raise ToolError(f"Webpack setup failed: {str(e)}")
async def _setup_vite(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up Vite build tool."""
try:
# Install vite
result = await self.env_manager.execute_in_environment(
environment,
'npm install vite --save-dev'
)
if result['exit_code'] != 0:
raise ToolError(f"vite installation failed: {result['error']}")
# Create vite config if not exists
config_content = """
export default {
root: 'src',
build: {
outDir: '../dist'
}
}
"""
config_path = Path(self.env_manager.environments[environment]['path']) / 'vite.config.js'
config_path.write_text(config_content)
return {"status": "success"}
except Exception as e:
raise ToolError(f"Vite setup failed: {str(e)}")
async def _setup_jest(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up Jest testing framework."""
try:
# Install jest and dependencies
result = await self.env_manager.execute_in_environment(
environment,
'npm install jest @types/jest --save-dev'
)
if result['exit_code'] != 0:
raise ToolError(f"jest installation failed: {result['error']}")
# Create jest config if not exists
config_content = """
module.exports = {
testEnvironment: 'node',
testMatch: ['**/*.test.js'],
collectCoverage: true
};
"""
config_path = Path(self.env_manager.environments[environment]['path']) / 'jest.config.js'
config_path.write_text(config_content)
return {"status": "success"}
except Exception as e:
raise ToolError(f"Jest setup failed: {str(e)}")
async def _setup_pytest(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up Pytest testing framework."""
try:
# Install pytest and dependencies
result = await self.env_manager.execute_in_environment(
environment,
'pip install pytest pytest-cov'
)
if result['exit_code'] != 0:
raise ToolError(f"pytest installation failed: {result['error']}")
# Create pytest config if not exists
config_content = """
[pytest]
testpaths = tests
python_files = test_*.py
addopts = --cov=src
"""
config_path = Path(self.env_manager.environments[environment]['path']) / 'pytest.ini'
config_path.write_text(config_content)
return {"status": "success"}
except Exception as e:
raise ToolError(f"Pytest setup failed: {str(e)}")
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/manager.py:
--------------------------------------------------------------------------------
```python
"""Project management system for MCP Development Server."""
import asyncio
import json
from pathlib import Path
from typing import Dict, Any, Optional, List
import git
from .project_types import PROJECT_TYPES, ProjectType, BuildSystem
from .templates import TemplateManager
from ..prompts.project_templates import PROJECT_TEMPLATES
from ..utils.logging import setup_logging
from ..utils.errors import ProjectError
from ..docker.manager import DockerManager
logger = setup_logging(__name__)
class ProjectManager:
"""Manages development projects."""
def __init__(self, config):
"""Initialize project manager.
Args:
config: Server configuration instance
"""
self.config = config
self.template_manager = TemplateManager()
self.docker_manager = DockerManager()
self.current_project = None
self.projects = {}
def get_available_project_types(self) -> Dict[str, Dict[str, Any]]:
"""Get list of available project types.
Returns:
Dict[str, Dict[str, Any]]: Project type information
"""
return {
name: {
"name": pt.name,
"description": pt.description,
"build_systems": [bs.value for bs in pt.build_systems],
"default_build_system": pt.default_build_system.value
}
for name, pt in PROJECT_TYPES.items()
}
async def create_project(
self,
name: str,
project_type: str,
project_config: Dict[str, Any],
path: Optional[str] = None,
description: str = ""
) -> Any:
"""Create a new project.
Args:
name: Project name
project_type: Type of project (e.g., java, dotnet, node)
project_config: Project-specific configuration
path: Project directory path (optional)
description: Project description
Returns:
Project instance
"""
try:
if project_type not in PROJECT_TYPES:
raise ProjectError(f"Unsupported project type: {project_type}")
project_type_info = PROJECT_TYPES[project_type]
# Determine project path
if not path:
projects_dir = Path(self.config.get("projectsDir"))
path = str(projects_dir / name)
project_path = Path(path)
if project_path.exists():
raise ProjectError(f"Project path already exists: {path}")
# Create project directory
project_path.mkdir(parents=True, exist_ok=True)
# Create project configuration
project_config.update({
"name": name,
"type": project_type,
"description": description,
"build_system": project_config.get("build_system",
project_type_info.default_build_system.value)
})
# Save project configuration
config_path = project_path / "project.json"
with open(config_path, "w") as f:
json.dump(project_config, f, indent=2)
# Create project structure
await self._create_project_structure(project_path, project_type_info)
# Initialize build system
await self._initialize_build_system(
project_path,
project_type_info,
project_config
)
# Set up Docker environment if requested
if project_config.get("setup_docker", False):
await self._setup_docker_environment(
project_path,
project_type_info,
project_config
)
# Initialize Git repository if requested
if project_config.get("initialize_git", True):
repo = git.Repo.init(path)
repo.index.add("*")
repo.index.commit("Initial commit")
# Create project instance
project = await self._create_project_instance(
path,
project_config,
project_type_info
)
# Store project reference
self.projects[project.id] = project
self.current_project = project
logger.info(f"Created {project_type} project: {name} at {path}")
return project
except Exception as e:
logger.error(f"Failed to create project: {str(e)}")
raise ProjectError(f"Project creation failed: {str(e)}")
async def _create_project_structure(
self,
project_path: Path,
project_type: ProjectType
):
"""Create project directory structure.
Args:
project_path: Project directory path
project_type: Project type information
"""
def create_directory_structure(base_path: Path, structure: Dict[str, Any]):
for name, content in structure.items():
path = base_path / name
if isinstance(content, dict):
path.mkdir(exist_ok=True)
create_directory_structure(path, content)
create_directory_structure(project_path, project_type.file_structure)
async def _initialize_build_system(
self,
project_path: Path,
project_type: ProjectType,
project_config: Dict[str, Any]
):
"""Initialize project build system.
Args:
project_path: Project directory path
project_type: Project type information
project_config: Project configuration
"""
build_system = BuildSystem(project_config["build_system"])
# Generate build system configuration files
if build_system == BuildSystem.MAVEN:
await self.template_manager.generate_maven_pom(
project_path, project_config
)
elif build_system == BuildSystem.GRADLE:
await self.template_manager.generate_gradle_build(
project_path, project_config
)
elif build_system == BuildSystem.DOTNET:
await self.template_manager.generate_dotnet_project(
project_path, project_config
)
elif build_system in [BuildSystem.NPM, BuildSystem.YARN]:
await self.template_manager.generate_package_json(
project_path, project_config
)
elif build_system == BuildSystem.POETRY:
await self.template_manager.generate_pyproject_toml(
project_path, project_config
)
async def _setup_docker_environment(
self,
project_path: Path,
project_type: ProjectType,
project_config: Dict[str, Any]
):
"""Set up Docker environment for the project.
Args:
project_path: Project directory path
project_type: Project type information
project_config: Project configuration
"""
# Generate Dockerfile from template
dockerfile_template = project_type.docker_templates[0] # Use first template
dockerfile_content = await self.docker_manager.generate_dockerfile(
dockerfile_template,
project_config
)
dockerfile_path = project_path / "Dockerfile"
with open(dockerfile_path, "w") as f:
f.write(dockerfile_content)
# Generate docker-compose.yml if needed
if project_config.get("use_docker_compose", False):
services = {
"app": {
"build": ".",
"volumes": [
"./:/workspace"
],
"environment": project_type.environment_variables
}
}
compose_content = await self.docker_manager.create_compose_config(
project_config["name"],
services,
project_path / "docker-compose.yml"
)
async def _create_project_instance(
self,
path: str,
config: Dict[str, Any],
project_type: ProjectType
) -> Any:
"""Create project instance based on type.
Args:
path: Project directory path
config: Project configuration
project_type: Project type information
Returns:
Project instance
"""
# Import appropriate project class based on type
if project_type.name == "java":
from .java_project import JavaProject
return JavaProject(path, config, project_type)
elif project_type.name == "dotnet":
from .dotnet_project import DotNetProject
return DotNetProject(path, config, project_type)
elif project_type.name == "node":
from .node_project import NodeProject
return NodeProject(path, config, project_type)
elif project_type.name == "python":
from .python_project import PythonProject
return PythonProject(path, config, project_type)
elif project_type.name == "golang":
from .golang_project import GolangProject
return GolangProject(path, config, project_type)
else:
from .base_project import Project
return Project(path, config, project_type)
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/workflow/manager.py:
--------------------------------------------------------------------------------
```python
"""Development workflow management for MCP Development Server."""
from typing import Dict, List, Optional, Any
from enum import Enum
from datetime import datetime
import asyncio
from ..utils.errors import WorkflowError
from ..utils.logging import setup_logging
logger = setup_logging(__name__)
class WorkflowStatus(str, Enum):
"""Workflow execution status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class WorkflowStep:
"""Individual step in a workflow."""
def __init__(
self,
name: str,
command: str,
environment: str,
depends_on: Optional[List[str]] = None,
timeout: Optional[int] = None,
retry_count: int = 0
):
self.name = name
self.command = command
self.environment = environment
self.depends_on = depends_on or []
self.timeout = timeout
self.retry_count = retry_count
self.status = WorkflowStatus.PENDING
self.result: Optional[Dict[str, Any]] = None
self.attempts = 0
class WorkflowManager:
"""Manages development workflows."""
def __init__(self, env_manager):
self.env_manager = env_manager
self.workflows: Dict[str, Dict[str, Any]] = {}
async def create_workflow(
self,
steps: List[WorkflowStep],
config: Optional[Dict[str, Any]] = None
) -> str:
"""Create a new workflow."""
try:
workflow_id = f"workflow_{len(self.workflows)}"
# Initialize workflow
self.workflows[workflow_id] = {
"steps": steps,
"config": config or {},
"status": WorkflowStatus.PENDING,
"start_time": None,
"end_time": None
}
return workflow_id
except Exception as e:
raise WorkflowError(f"Failed to create workflow: {str(e)}")
async def start_workflow(self, workflow_id: str) -> None:
"""Start workflow execution."""
try:
if workflow := self.workflows.get(workflow_id):
workflow["status"] = WorkflowStatus.RUNNING
workflow["start_time"] = datetime.now()
# Execute workflow steps
asyncio.create_task(self._execute_workflow(workflow_id))
else:
raise WorkflowError(f"Workflow not found: {workflow_id}")
except Exception as e:
raise WorkflowError(f"Failed to start workflow: {str(e)}")
async def _execute_workflow(self, workflow_id: str) -> None:
"""Execute workflow steps in order."""
workflow = self.workflows[workflow_id]
try:
# Build execution graph
graph = self._build_execution_graph(workflow["steps"])
# Execute steps in dependency order
for step_group in graph:
results = await asyncio.gather(
*[self._execute_step(workflow_id, step) for step in step_group],
return_exceptions=True
)
# Check for failures
if any(isinstance(r, Exception) for r in results):
workflow["status"] = WorkflowStatus.FAILED
return
workflow["status"] = WorkflowStatus.COMPLETED
except Exception as e:
logger.error(f"Workflow execution error: {str(e)}")
workflow["status"] = WorkflowStatus.FAILED
workflow["error"] = str(e)
finally:
workflow["end_time"] = datetime.now()
async def _execute_step(
self,
workflow_id: str,
step: WorkflowStep
) -> None:
"""Execute a single workflow step."""
try:
step.status = WorkflowStatus.RUNNING
step.attempts += 1
# Execute step command
result = await asyncio.wait_for(
self.env_manager.execute_in_environment(
step.environment,
step.command
),
timeout=step.timeout
)
# Handle step result
success = result["exit_code"] == 0
step.result = {
"output": result["output"],
"error": result.get("error"),
"exit_code": result["exit_code"]
}
if success:
step.status = WorkflowStatus.COMPLETED
else:
# Handle retry logic
if step.attempts < step.retry_count + 1:
logger.info(f"Retrying step {step.name} (attempt {step.attempts})")
return await self._execute_step(workflow_id, step)
step.status = WorkflowStatus.FAILED
except asyncio.TimeoutError:
step.status = WorkflowStatus.FAILED
step.result = {
"error": "Step execution timed out"
}
except Exception as e:
step.status = WorkflowStatus.FAILED
step.result = {
"error": str(e)
}
def _build_execution_graph(
self,
steps: List[WorkflowStep]
) -> List[List[WorkflowStep]]:
"""Build ordered list of step groups based on dependencies."""
# Initialize variables
graph: List[List[WorkflowStep]] = []
completed = set()
remaining = set(step.name for step in steps)
steps_by_name = {step.name: step for step in steps}
while remaining:
# Find steps with satisfied dependencies
group = set()
for step_name in remaining:
step = steps_by_name[step_name]
if all(dep in completed for dep in step.depends_on):
group.add(step_name)
if not group:
# Circular dependency detected
raise WorkflowError("Circular dependency detected in workflow steps")
# Add group to graph
graph.append([steps_by_name[name] for name in group])
completed.update(group)
remaining.difference_update(group)
return graph
async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get status and results of a workflow."""
if workflow := self.workflows.get(workflow_id):
return {
"id": workflow_id,
"status": workflow["status"],
"steps": [
{
"name": step.name,
"status": step.status,
"result": step.result,
"attempts": step.attempts
}
for step in workflow["steps"]
],
"start_time": workflow["start_time"],
"end_time": workflow["end_time"],
"error": workflow.get("error")
}
raise WorkflowError(f"Workflow not found: {workflow_id}")
def get_common_workflows(self) -> Dict[str, List[WorkflowStep]]:
"""Get predefined common workflow templates."""
return {
"build": [
WorkflowStep(
name="install",
command="npm install",
environment="default"
),
WorkflowStep(
name="lint",
command="npm run lint",
environment="default",
depends_on=["install"]
),
WorkflowStep(
name="test",
command="npm test",
environment="default",
depends_on=["install"]
),
WorkflowStep(
name="build",
command="npm run build",
environment="default",
depends_on=["lint", "test"]
)
],
"test": [
WorkflowStep(
name="install_deps",
command="npm install",
environment="default"
),
WorkflowStep(
name="unit_tests",
command="npm run test:unit",
environment="default",
depends_on=["install_deps"]
),
WorkflowStep(
name="integration_tests",
command="npm run test:integration",
environment="default",
depends_on=["install_deps"]
),
WorkflowStep(
name="coverage",
command="npm run coverage",
environment="default",
depends_on=["unit_tests", "integration_tests"]
)
],
"release": [
WorkflowStep(
name="bump_version",
command="npm version patch",
environment="default"
),
WorkflowStep(
name="build",
command="npm run build",
environment="default",
depends_on=["bump_version"]
),
WorkflowStep(
name="publish",
command="npm publish",
environment="default",
depends_on=["build"]
)
]
}
```
--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/streams.py:
--------------------------------------------------------------------------------
```python
"""Container output streaming and file synchronization."""
import os
import time
import asyncio
import hashlib
import collections
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional, AsyncGenerator, Any
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from ..utils.logging import setup_logging
from ..utils.errors import StreamError, SyncError
logger = setup_logging(__name__)
class OutputFormat(str, Enum):
"""Output stream formats."""
STDOUT = "stdout"
STDERR = "stderr"
COMBINED = "combined"
FORMATTED = "formatted"
class StreamConfig:
"""Stream configuration."""
def __init__(
self,
format: OutputFormat = OutputFormat.COMBINED,
buffer_size: int = 1024,
filters: Optional[List[str]] = None,
timestamp: bool = False
):
self.format = format
self.buffer_size = buffer_size
self.filters = filters or []
self.timestamp = timestamp
class SyncConfig:
"""Synchronization configuration."""
def __init__(
self,
ignore_patterns: Optional[List[str]] = None,
sync_interval: float = 1.0,
atomic: bool = True
):
self.ignore_patterns = ignore_patterns or []
self.sync_interval = sync_interval
self.atomic = atomic
class StreamInfo:
"""Information about an active stream."""
def __init__(self, task: asyncio.Task, config: StreamConfig):
self.task = task
self.config = config
self.start_time = datetime.now()
class EnhancedOutputStreamManager:
"""Enhanced streaming output manager."""
def __init__(self, docker_manager):
self.docker_manager = docker_manager
self.active_streams: Dict[str, StreamInfo] = {}
self._buffer = collections.deque(maxlen=1000) # Keep last 1000 messages
async def start_stream(
self,
container_name: str,
command: str,
config: StreamConfig,
callback: Optional[callable] = None
) -> AsyncGenerator[str, None]:
"""Start enhanced output stream."""
try:
container = self.docker_manager.containers.get(container_name)
if not container:
raise StreamError(f"Container not found: {container_name}")
# Create execution with specified format
exec_result = container.exec_run(
command,
stream=True,
demux=True,
socket=True # Use socket for better streaming
)
async def stream_handler():
buffer = []
try:
async for data in exec_result.output:
# Apply format and filtering
processed_data = self._process_stream_data(data, config)
if processed_data:
buffer.extend(processed_data)
if len(buffer) >= config.buffer_size:
output = ''.join(buffer)
buffer.clear()
self._buffer.append(output)
if callback:
await callback(output)
yield output
except Exception as e:
logger.error(f"Stream processing error: {str(e)}")
raise StreamError(f"Stream processing error: {str(e)}")
finally:
if buffer:
output = ''.join(buffer)
self._buffer.append(output)
if callback:
await callback(output)
yield output
if container_name in self.active_streams:
del self.active_streams[container_name]
# Create and store stream task
stream_task = asyncio.create_task(stream_handler())
self.active_streams[container_name] = StreamInfo(stream_task, config)
async for output in stream_task:
yield output
except Exception as e:
logger.error(f"Failed to start stream: {str(e)}")
raise StreamError(f"Failed to start stream: {str(e)}")
def _process_stream_data(
self,
data: bytes,
config: StreamConfig
) -> Optional[str]:
"""Process stream data according to config."""
if not data:
return None
# Split streams if demuxed
stdout, stderr = data if isinstance(data, tuple) else (data, None)
# Apply format
if config.format == OutputFormat.STDOUT and stdout:
output = stdout.decode()
elif config.format == OutputFormat.STDERR and stderr:
output = stderr.decode()
elif config.format == OutputFormat.COMBINED:
output = ''
if stdout:
output += stdout.decode()
if stderr:
output += stderr.decode()
elif config.format == OutputFormat.FORMATTED:
output = self._format_output(stdout, stderr)
else:
return None
# Apply filters
for filter_pattern in config.filters:
if filter_pattern in output:
return None
# Add timestamp if requested
if config.timestamp:
output = f"[{datetime.now().isoformat()}] {output}"
return output
@staticmethod
def _format_output(stdout: Optional[bytes], stderr: Optional[bytes]) -> str:
"""Format output with colors and prefixes."""
output = []
if stdout:
output.append(f"\033[32m[OUT]\033[0m {stdout.decode()}")
if stderr:
output.append(f"\033[31m[ERR]\033[0m {stderr.decode()}")
return '\n'.join(output)
async def stop_stream(self, container_name: str) -> None:
"""Stop streaming from a container."""
if stream_info := self.active_streams.get(container_name):
stream_info.task.cancel()
try:
await stream_info.task
except asyncio.CancelledError:
pass
del self.active_streams[container_name]
class BiDirectionalSync:
"""Enhanced bi-directional file synchronization."""
def __init__(self, docker_manager):
self.docker_manager = docker_manager
self.sync_handlers: Dict[str, EnhancedSyncHandler] = {}
self.observer = Observer()
self.observer.start()
async def start_sync(
self,
container_name: str,
host_path: str,
container_path: str,
config: SyncConfig
) -> None:
"""Start bi-directional file sync."""
try:
# Validate paths
if not os.path.exists(host_path):
raise SyncError(f"Host path does not exist: {host_path}")
container = self.docker_manager.containers.get(container_name)
if not container:
raise SyncError(f"Container not found: {container_name}")
# Create sync handler
handler = EnhancedSyncHandler(
container=container,
container_path=container_path,
host_path=host_path,
config=config
)
# Start watching both directions
self.observer.schedule(
handler,
host_path,
recursive=True
)
# Start container file watcher
await handler.start_container_watcher()
self.sync_handlers[container_name] = handler
logger.info(f"Started bi-directional sync for container: {container_name}")
except Exception as e:
raise SyncError(f"Failed to start sync: {str(e)}")
async def stop_sync(self, container_name: str) -> None:
"""Stop synchronization for a container."""
if handler := self.sync_handlers.get(container_name):
self.observer.unschedule_all()
await handler.stop_container_watcher()
del self.sync_handlers[container_name]
logger.info(f"Stopped sync for container: {container_name}")
async def cleanup(self) -> None:
"""Clean up all synchronization handlers."""
for container_name in list(self.sync_handlers.keys()):
await self.stop_sync(container_name)
self.observer.stop()
self.observer.join()
class EnhancedSyncHandler(FileSystemEventHandler):
"""Enhanced sync handler with bi-directional support."""
def __init__(
self,
container,
container_path: str,
host_path: str,
config: SyncConfig
):
super().__init__()
self.container = container
self.container_path = container_path
self.host_path = host_path
self.config = config
self.sync_lock = asyncio.Lock()
self.pending_syncs: Dict[str, float] = {}
self._container_watcher: Optional[asyncio.Task] = None
async def start_container_watcher(self) -> None:
"""Start watching container files."""
cmd = f"""
inotifywait -m -r -e modify,create,delete,move {self.container_path}
"""
exec_result = self.container.exec_run(
cmd,
stream=True,
detach=True
)
self._container_watcher = asyncio.create_task(
self._handle_container_events(exec_result.output)
)
async def stop_container_watcher(self) -> None:
"""Stop container file watcher."""
if self._container_watcher:
self._container_watcher.cancel()
try:
await self._container_watcher
except asyncio.CancelledError:
pass
self._container_watcher = None
async def _handle_container_events(self, output_stream: AsyncGenerator) -> None:
"""Handle container file events."""
try:
async for event in output_stream:
await self._handle_container_change(event.decode())
except Exception as e:
logger.error(f"Container watcher error: {str(e)}")
async def _handle_container_change(self, event: str) -> None:
"""Handle container file change."""
try:
# Parse inotify event
parts = event.strip().split()
if len(parts) >= 3:
path = parts[0]
change_type = parts[1]
filename = parts[2]
container_path = os.path.join(path, filename)
host_path = self._container_to_host_path(container_path)
# Apply filters
if self._should_ignore(host_path):
return
async with self.sync_lock:
# Check if change is from host sync
if host_path in self.pending_syncs:
if time.time() - self.pending_syncs[host_path] < self.config.sync_interval:
return
# Sync from container to host
await self._sync_to_host(container_path, host_path)
except Exception as e:
logger.error(f"Error handling container change: {str(e)}")
def _container_to_host_path(self, container_path: str) -> str:
"""Convert container path to host path."""
rel_path = os.path.relpath(container_path, self.container_path)
return os.path.join(self.host_path, rel_path)
def _should_ignore(self, path: str) -> bool:
"""Check if path should be ignored."""
return any(pattern in path for pattern in self.config.ignore_patterns)
async def _sync_to_host(
self,
container_path: str,
host_path: str
) -> None:
"""Sync file from container to host."""
try:
# Get file from container
stream, stat = self.container.get_archive(container_path)
# Create parent directories
os.makedirs(os.path.dirname(host_path), exist_ok=True)
if self.config.atomic:
# Save file atomically using temporary file
tmp_path = f"{host_path}.tmp"
with open(tmp_path, 'wb') as f:
for chunk in stream:
f.write(chunk)
os.rename(tmp_path, host_path)
else:
# Direct write
with open(host_path, 'wb') as f:
for chunk in stream:
f.write(chunk)
# Update sync tracking
self.pending_syncs[host_path] = time.time()
except Exception as e:
logger.error(f"Error syncing to host: {str(e)}")
raise SyncError(f"Failed to sync file {container_path}: {str(e)}")
```