# Directory Structure ``` ├── .gitignore ├── .python-version ├── package.json ├── pyproject.toml ├── README.md ├── setup.py ├── src │ ├── mcp_dev_server │ │ ├── __init__.py │ │ ├── __main__.py │ │ ├── core │ │ │ ├── __init__.py │ │ │ └── server.py │ │ ├── docker │ │ │ ├── manager.py │ │ │ ├── streams.py │ │ │ ├── templates │ │ │ │ ├── dev.dockerfile │ │ │ │ ├── node.dockerfile │ │ │ │ └── python.dockerfile │ │ │ ├── templates.py │ │ │ ├── volumes.py │ │ │ └── xxx.py │ │ ├── environments │ │ │ ├── manager.py │ │ │ ├── tools.py │ │ │ └── workflow.py │ │ ├── handlers │ │ │ ├── __init__.py │ │ │ └── input_request_handler.py │ │ ├── managers │ │ │ ├── __init__.py │ │ │ ├── base_manager.py │ │ │ ├── build_manager.py │ │ │ ├── dependency_manager.py │ │ │ ├── project_manager.py │ │ │ ├── template_manager.py │ │ │ ├── test_manager.py │ │ │ └── workflow_manager.py │ │ ├── models │ │ │ ├── __init__.py │ │ │ ├── config.py │ │ │ ├── errors.py │ │ │ └── input_response.py │ │ ├── package │ │ │ └── manager.py │ │ ├── project_manager │ │ │ ├── base_project.py │ │ │ ├── context.py │ │ │ ├── git.py │ │ │ ├── manager.py │ │ │ ├── project_types.py │ │ │ ├── project.py │ │ │ └── templates.py │ │ ├── prompts │ │ │ ├── handler.py │ │ │ ├── input_protocol.py │ │ │ ├── project_templates.py │ │ │ └── templates.py │ │ ├── server.py │ │ ├── test │ │ │ └── manager.py │ │ ├── utils │ │ │ ├── __init__.py │ │ │ ├── config.py │ │ │ ├── errors.py │ │ │ └── logging.py │ │ └── workflow │ │ └── manager.py │ └── resources │ └── templates │ └── basic │ └── files ├── tests │ └── test_integration.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.12 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python __pycache__/ *.py[cod] *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg # Virtual environments .env .venv env/ venv/ ENV/ # IDE .idea/ .vscode/ *.swp *.swo # Project specific *.log .docker/ .pytest_cache/ .coverage htmlcov/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # MCP Development Server A Model Context Protocol (MCP) server that enables Claude to manage software development projects, providing complete project context awareness and handling code execution through Docker environments. ## Features ### Core Infrastructure - Project context management - File system operations - Template-based project creation - Git integration ### Requirements - Python 3.12 or higher - Docker - Git ## Installation ```bash # Using pip pip install mcp-dev-server # Development installation git clone https://github.com/your-org/mcp-dev-server.git cd mcp-dev-server pip install -e . ``` ## Configuration ### Claude Desktop Configuration Add to your Claude Desktop configuration file: On MacOS: `~/Library/Application Support/Claude/claude_desktop_config.json` On Windows: `%APPDATA%/Claude/claude_desktop_config.json` ```json { "mcpServers": { "dev": { "command": "mcp-dev-server", "args": [] } } } ``` ## Usage The server provides several MCP capabilities: ### Resources - Project structure and files - Build status and artifacts - Test results - Docker container status ### Tools - Project initialization - Build operations - Test execution - Docker commands ### Prompts - Project analysis - Development suggestions - Error diagnosis ## Development ### Setting up development environment ```bash # Create virtual environment python -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate # Install dependencies pip install -e ".[dev]" ``` ### Running tests ```bash pytest tests/ ``` ## Contributing Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct and the process for submitting pull requests. ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/xxx.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/utils/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/core/__init__.py: -------------------------------------------------------------------------------- ```python from .server import Server __all__ = ['Server'] ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/prompts/input_protocol.py: -------------------------------------------------------------------------------- ```python """Input request protocol for MCP server.""" [Previous content...] ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/handlers/__init__.py: -------------------------------------------------------------------------------- ```python from .input_request_handler import InputRequestHandler __all__ = ['InputRequestHandler'] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["setuptools>=45", "wheel"] build-backend = "setuptools.build_meta" ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/models/errors.py: -------------------------------------------------------------------------------- ```python class MCPDevServerError(Exception): """Base exception class for MCP Development Server errors.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/__main__.py: -------------------------------------------------------------------------------- ```python """Main entry point when run with python -m mcp_dev_server""" from . import main if __name__ == '__main__': main() ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/test_manager.py: -------------------------------------------------------------------------------- ```python class TestManager: """Manager class for test-related operations.""" def __init__(self): """Initialize the test manager.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/build_manager.py: -------------------------------------------------------------------------------- ```python class BuildManager: """Manager class for build-related operations.""" def __init__(self): """Initialize the build manager.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/models/__init__.py: -------------------------------------------------------------------------------- ```python from .config import Config from .input_response import InputResponse from .errors import MCPDevServerError __all__ = ['Config', 'InputResponse', 'MCPDevServerError'] ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/template_manager.py: -------------------------------------------------------------------------------- ```python class TemplateManager: """Manager class for template-related operations.""" def __init__(self): """Initialize the template manager.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/workflow_manager.py: -------------------------------------------------------------------------------- ```python class WorkflowManager: """Manager class for workflow-related operations.""" def __init__(self): """Initialize the workflow manager.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/dependency_manager.py: -------------------------------------------------------------------------------- ```python class DependencyManager: """Manager class for dependency-related operations.""" def __init__(self): """Initialize the dependency manager.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/project_manager.py: -------------------------------------------------------------------------------- ```python from ..models import Config class ProjectManager: """Manager class for project-related operations.""" def __init__(self, config: Config): """Initialize the project manager. Args: config: Server configuration """ self.config = config ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/__init__.py: -------------------------------------------------------------------------------- ```python from .project_manager import ProjectManager from .template_manager import TemplateManager from .build_manager import BuildManager from .dependency_manager import DependencyManager from .test_manager import TestManager from .workflow_manager import WorkflowManager __all__ = [ 'ProjectManager', 'TemplateManager', 'BuildManager', 'DependencyManager', 'TestManager', 'WorkflowManager' ] ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/managers/base_manager.py: -------------------------------------------------------------------------------- ```python """Base manager class with common functionality.""" import uuid from typing import Dict, Any class BaseManager: """Base class for all managers.""" def _generate_id(self) -> str: """Generate a unique identifier. Returns: str: Unique identifier """ return str(uuid.uuid4()) async def cleanup(self): """Clean up resources. Override in subclasses.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/__init__.py: -------------------------------------------------------------------------------- ```python """MCP Development Server Package.""" from . import server import asyncio from typing import Optional from .utils.logging import setup_logging logger = setup_logging(__name__) def main(): """Main entry point for the package.""" try: server_instance = server.MCPDevServer() asyncio.run(server_instance.run()) except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {str(e)}") raise # Expose key components at package level __all__ = ['main', 'server'] ``` -------------------------------------------------------------------------------- /setup.py: -------------------------------------------------------------------------------- ```python from setuptools import setup, find_packages setup( name="mcp-dev-server", version="0.1.0", packages=find_packages(where="src"), package_dir={"": "src"}, install_requires=[ "mcp", # Base MCP package "aiohttp>=3.8.0", "websockets>=10.0", "uvicorn>=0.15.0", "fastapi>=0.68.0", "typing_extensions>=4.5.0", ], entry_points={ "console_scripts": [ "mcp-dev-server=mcp_dev_server:main", ], }, python_requires=">=3.8", author="Your Name", description="MCP Development Server" ) ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/models/input_response.py: -------------------------------------------------------------------------------- ```python from typing import Any, Dict class InputResponse: """Class representing a user's input response.""" def __init__(self, request_id: str, values: Dict[str, Any]): """Initialize an input response. Args: request_id: ID of the input request values: Dictionary of input values """ self.request_id = request_id self.values = values def validate(self) -> bool: """Validate the input response. Returns: bool: True if valid, False otherwise """ return True # TODO: Implement validation ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/utils/errors.py: -------------------------------------------------------------------------------- ```python """Error definitions for MCP Development Server.""" class MCPDevServerError(Exception): """Base error class for MCP Development Server.""" pass class ProjectError(MCPDevServerError): """Project-related errors.""" pass class BuildError(MCPDevServerError): """Build-related errors.""" pass class TestError(MCPDevServerError): """Test-related errors.""" pass class EnvironmentError(MCPDevServerError): """Environment-related errors.""" pass class ConfigurationError(MCPDevServerError): """Configuration-related errors.""" pass class WorkflowError(MCPDevServerError): """Workflow-related errors.""" pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/templates/node.dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Node.js development environment FROM node:{{ node_version }} # Install system dependencies RUN apt-get update && apt-get install -y \ git \ curl \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /workspace {% if package_file %} # Install Node.js dependencies COPY {{ package_file }} . {% if package_lock %} COPY {{ package_lock }} . RUN npm ci {% else %} RUN npm install {% endif %} {% endif %} {% if global_packages %} # Install global packages RUN npm install -g {% for package in global_packages %}{{ package }} {% endfor %} {% endif %} # Set Node.js environment variables ENV NODE_ENV=development {% if command %} # Default command CMD {{ command }} {% endif %} ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/models/config.py: -------------------------------------------------------------------------------- ```python class Config: """Configuration class for MCP Development Server.""" def __init__(self): """Initialize configuration with default values.""" self.host = "localhost" self.port = 8000 self.debug = False def load_from_file(self, file_path: str): """Load configuration from a file. Args: file_path: Path to configuration file """ pass # TODO: Implement configuration loading def save_to_file(self, file_path: str): """Save current configuration to a file. Args: file_path: Path to save configuration """ pass # TODO: Implement configuration saving ``` -------------------------------------------------------------------------------- /package.json: -------------------------------------------------------------------------------- ```json { "name": "mcp-dev-server", "version": "1.0.0", "description": "Model Context Protocol Development Server", "main": "dist/app.js", "scripts": { "start": "node dist/app.js", "dev": "nodemon src/app.ts", "build": "tsc", "test": "jest", "lint": "eslint . --ext .ts" }, "dependencies": { "express": "^4.18.2", "typescript": "^5.0.0", "mongoose": "^7.0.0", "dotenv": "^16.0.0", "winston": "^3.8.0", "cors": "^2.8.5", "helmet": "^6.0.0", "joi": "^17.0.0" }, "devDependencies": { "@types/express": "^4.17.17", "@types/node": "^18.0.0", "@types/jest": "^29.0.0", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint": "^8.0.0", "jest": "^29.0.0", "nodemon": "^2.0.0", "ts-jest": "^29.0.0", "ts-node": "^10.0.0" } } ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/templates/python.dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Python development environment FROM python:{{ python_version }}-slim # Install system dependencies RUN apt-get update && apt-get install -y \ git \ curl \ build-essential \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /workspace {% if install_poetry %} # Install Poetry RUN curl -sSL https://install.python-poetry.org | python3 - ENV PATH="/root/.local/bin:$PATH" {% endif %} {% if requirements_file %} # Install Python dependencies COPY {{ requirements_file }} . RUN pip install -r {{ requirements_file }} {% endif %} {% if additional_packages %} # Install additional packages RUN pip install {% for package in additional_packages %}{{ package }} {% endfor %} {% endif %} # Set Python environment variables ENV PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 {% if command %} # Default command CMD {{ command }} {% endif %} ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/utils/logging.py: -------------------------------------------------------------------------------- ```python """Logging configuration for MCP Development Server.""" import logging import sys from typing import Optional def setup_logging(name: Optional[str] = None, level: int = logging.INFO) -> logging.Logger: """Setup logging configuration. Args: name: Logger name level: Logging level Returns: logging.Logger: Configured logger instance """ # Create logger logger = logging.getLogger(name or __name__) logger.setLevel(level) # Create stderr handler (MCP protocol requires clean stdout) handler = logging.StreamHandler(sys.stderr) handler.setLevel(level) # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) # Add handler to logger logger.addHandler(handler) return logger ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/handlers/input_request_handler.py: -------------------------------------------------------------------------------- ```python from typing import Dict, Any, Optional from ..models import InputResponse class InputRequestHandler: """Handler for input requests.""" def __init__(self): """Initialize the input request handler.""" pass async def request_input(self, request_type: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Request input from the user. Args: request_type: Type of input request context: Additional context for request Returns: Dict[str, Any]: User's input values """ return {} # TODO: Implement input request handling def handle_response(self, response: InputResponse): """Handle input response from user. Args: response: User's response """ pass # TODO: Implement response handling ``` -------------------------------------------------------------------------------- /tests/test_integration.py: -------------------------------------------------------------------------------- ```python """Test MCP server integration with Claude.""" import asyncio import pytest from mcp_dev_server.server import MCPDevServer from mcp_dev_server.utils.config import Config @pytest.mark.asyncio async def test_server_initialization(): """Test server initialization.""" config = Config() server = MCPDevServer() # Test project creation project = await server.project_manager.create_project( name="test-project", project_type="python", project_config={ "python_version": "3.12", "project_type": "fastapi", "dependency_management": "poetry" } ) assert project is not None assert project.config["name"] == "test-project" # Test tool execution result = await server.handle_call_tool("build", { "environment": "default", "command": "build" }) assert result[0].type == "text" # Cleanup await server.cleanup() ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/templates.py: -------------------------------------------------------------------------------- ```python """Dockerfile templates for different environments.""" from typing import Dict, Optional from jinja2 import Template class DockerTemplates: """Manages Dockerfile templates for different environments.""" @staticmethod def get_template(environment: str, config: Optional[Dict[str, Any]] = None) -> str: """Get Dockerfile template for specific environment.""" config = config or {} if environment == "python": return Template(""" FROM python:{{ python_version|default('3.12-slim') }} WORKDIR /app {% if requirements_file %} COPY {{ requirements_file }} . RUN pip install -r {{ requirements_file }} {% endif %} {% if install_dev_deps %} RUN pip install pytest mypy black {% endif %} {% for cmd in additional_commands|default([]) %} RUN {{ cmd }} {% endfor %} COPY . . CMD ["python", "{{ entry_point|default('main.py') }}"] """).render(config) elif environment == "node": return Template(""" FROM node:{{ node_version|default('20-slim') }} WORKDIR /app COPY package*.json ./ RUN npm install {% if install_dev_deps %}--include=dev{% endif %} {% for cmd in additional_commands|default([]) %} RUN {{ cmd }} {% endfor %} COPY . . CMD ["npm", "{{ npm_command|default('start') }}"] """).render(config) else: raise ValueError(f"Unknown environment: {environment}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/templates/dev.dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Multi-language development environment FROM ubuntu:{{ ubuntu_version }} # Install system dependencies RUN apt-get update && apt-get install -y \ git \ curl \ build-essential \ software-properties-common \ && rm -rf /var/lib/apt/lists/* {% if install_python %} # Install Python RUN add-apt-repository ppa:deadsnakes/ppa && \ apt-get update && \ apt-get install -y python{{ python_version }} python{{ python_version }}-venv python{{ python_version }}-dev && \ rm -rf /var/lib/apt/lists/* {% endif %} {% if install_node %} # Install Node.js RUN curl -fsSL https://deb.nodesource.com/setup_{{ node_version }}.x | bash - && \ apt-get install -y nodejs && \ rm -rf /var/lib/apt/lists/* {% endif %} {% if install_docker %} # Install Docker RUN curl -fsSL https://get.docker.com | sh && \ rm -rf /var/lib/apt/lists/* {% endif %} # Set working directory WORKDIR /workspace {% if requirements_file %} # Install Python dependencies COPY {{ requirements_file }} . RUN pip{{ python_version }} install -r {{ requirements_file }} {% endif %} {% if package_file %} # Install Node.js dependencies COPY {{ package_file }} . {% if package_lock %} COPY {{ package_lock }} . RUN npm ci {% else %} RUN npm install {% endif %} {% endif %} {% if additional_tools %} # Install additional tools RUN apt-get update && apt-get install -y \ {% for tool in additional_tools %}{{ tool }} {% endfor %} \ && rm -rf /var/lib/apt/lists/* {% endif %} # Set environment variables ENV PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 \ NODE_ENV=development {% if command %} # Default command CMD {{ command }} {% endif %} ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/prompts/handler.py: -------------------------------------------------------------------------------- ```python [Previous handler.py content...] async def process_field_dependencies(self, request: InputRequest, field_updates: Dict[str, Any]): """Process field dependencies based on user input. Some fields might need to be updated based on values of other fields. For example, if user selects Python as language, we need to show Python version field. Args: request: Current input request field_updates: Updated field values """ try: if request.request_id == "environment_setup": language = field_updates.get("language") if language: # Update required fields based on language selection for field in request.fields: if field.name == "python_version": field.required = language in ["python", "both"] elif field.name == "node_version": field.required = language in ["node", "both"] elif request.request_id == "test_configuration": test_framework = field_updates.get("test_framework") if test_framework: # Update coverage options based on test framework for field in request.fields: if field.name == "include_coverage": field.options = self._get_coverage_options(test_framework) def _get_coverage_options(self, framework: str) -> List[Dict[str, str]]: """Get coverage tool options based on test framework.""" coverage_tools = { "pytest": [ {"value": "pytest-cov", "label": "pytest-cov"}, {"value": "coverage", "label": "coverage.py"} ], "unittest": [ {"value": "coverage", "label": "coverage.py"} ], "jest": [ {"value": "jest-coverage", "label": "Jest Coverage"} ], "mocha": [ {"value": "nyc", "label": "Istanbul/nyc"} ] } return coverage_tools.get(framework, []) ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/utils/config.py: -------------------------------------------------------------------------------- ```python """Configuration management for MCP Development Server.""" import os import json from typing import Dict, Any, Optional from pathlib import Path class Config: """Configuration manager.""" def __init__(self): """Initialize configuration.""" self.config_dir = self._get_config_dir() self.config_file = self.config_dir / "config.json" self.config: Dict[str, Any] = self._load_config() def _get_config_dir(self) -> Path: """Get configuration directory path.""" if os.name == "nt": # Windows config_dir = Path(os.getenv("APPDATA")) / "Claude" else: # macOS/Linux config_dir = Path.home() / ".config" / "claude" config_dir.mkdir(parents=True, exist_ok=True) return config_dir def _load_config(self) -> Dict[str, Any]: """Load configuration from file.""" if self.config_file.exists(): try: with open(self.config_file, "r") as f: return json.load(f) except Exception as e: print(f"Error loading config: {e}") return self._get_default_config() else: config = self._get_default_config() self._save_config(config) return config def _save_config(self, config: Dict[str, Any]): """Save configuration to file.""" try: with open(self.config_file, "w") as f: json.dump(config, f, indent=2) except Exception as e: print(f"Error saving config: {e}") def _get_default_config(self) -> Dict[str, Any]: """Get default configuration.""" return { "projectsDir": str(Path.home() / "Projects"), "templatesDir": str(self.config_dir / "templates"), "environments": { "default": { "type": "docker", "image": "python:3.12-slim" } } } def get(self, key: str, default: Any = None) -> Any: """Get configuration value.""" return self.config.get(key, default) def set(self, key: str, value: Any): """Set configuration value.""" self.config[key] = value self._save_config(self.config) def update(self, updates: Dict[str, Any]): """Update multiple configuration values.""" self.config.update(updates) self._save_config(self.config) ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/volumes.py: -------------------------------------------------------------------------------- ```python """Docker volume management for MCP Development Server.""" from typing import Dict, List, Optional import docker from docker.errors import DockerException from ..utils.logging import setup_logging from ..utils.errors import DockerError logger = setup_logging(__name__) class VolumeManager: """Manages Docker volumes for development environments.""" def __init__(self): self.client = docker.from_env() async def create_volume( self, name: str, labels: Optional[Dict[str, str]] = None ) -> str: """Create a Docker volume.""" try: volume = self.client.volumes.create( name=name, driver='local', labels=labels or {} ) logger.info(f"Created volume: {name}") return volume.name except DockerException as e: raise DockerError(f"Failed to create volume: {str(e)}") async def remove_volume(self, name: str) -> None: """Remove a Docker volume.""" try: volume = self.client.volumes.get(name) volume.remove() logger.info(f"Removed volume: {name}") except DockerException as e: raise DockerError(f"Failed to remove volume: {str(e)}") async def list_volumes( self, filters: Optional[Dict[str, str]] = None ) -> List[Dict[str, Any]]: """List Docker volumes.""" try: volumes = self.client.volumes.list(filters=filters or {}) return [ { "name": v.name, "driver": v.attrs['Driver'], "mountpoint": v.attrs['Mountpoint'], "labels": v.attrs['Labels'] or {} } for v in volumes ] except DockerException as e: raise DockerError(f"Failed to list volumes: {str(e)}") async def get_volume_info(self, name: str) -> Dict[str, Any]: """Get detailed information about a volume.""" try: volume = self.client.volumes.get(name) return { "name": volume.name, "driver": volume.attrs['Driver'], "mountpoint": volume.attrs['Mountpoint'], "labels": volume.attrs['Labels'] or {}, "scope": volume.attrs['Scope'], "status": volume.attrs.get('Status', {}) } except DockerException as e: raise DockerError(f"Failed to get volume info: {str(e)}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/package/manager.py: -------------------------------------------------------------------------------- ```python """Package management integration for MCP Development Server.""" from typing import Dict, List, Optional, Any from enum import Enum from ..utils.errors import PackageError from ..utils.logging import setup_logging logger = setup_logging(__name__) class PackageManager(str, Enum): """Supported package managers.""" NPM = "npm" PIP = "pip" CARGO = "cargo" class DependencyManager: """Manages project dependencies.""" def __init__(self, env_manager): self.env_manager = env_manager async def install_dependencies( self, environment: str, package_manager: PackageManager, dependencies: List[str], dev: bool = False ) -> Dict[str, Any]: """Install project dependencies.""" try: command = self._build_install_command( package_manager, dependencies, dev ) result = await self.env_manager.execute_in_environment( environment, command ) return { "success": result["exit_code"] == 0, "output": result["output"], "error": result.get("error") } except Exception as e: raise PackageError(f"Failed to install dependencies: {str(e)}") async def update_dependencies( self, environment: str, package_manager: PackageManager, dependencies: Optional[List[str]] = None ) -> Dict[str, Any]: """Update project dependencies.""" try: command = self._build_update_command(package_manager, dependencies) result = await self.env_manager.execute_in_environment( environment, command ) return { "success": result["exit_code"] == 0, "output": result["output"], "error": result.get("error") } except Exception as e: raise PackageError(f"Failed to update dependencies: {str(e)}") def _build_install_command( self, package_manager: PackageManager, dependencies: List[str], dev: bool ) -> str: """Build dependency installation command.""" if package_manager == PackageManager.NPM: dev_flag = "--save-dev" if dev else "" deps = " ".join(dependencies) return f"npm install {dev_flag} {deps}" elif package_manager == PackageManager.PIP: dev_flag = "-D" if dev else "" deps = " ".join(dependencies) return f"pip install {dev_flag} {deps}" elif package_manager == PackageManager.CARGO: dev_flag = "--dev" if dev else "" deps = " ".join(dependencies) return f"cargo add {dev_flag} {deps}" else: raise PackageError(f"Unsupported package manager: {package_manager}") def _build_update_command( self, package_manager: PackageManager, dependencies: Optional[List[str]] = None ) -> str: """Build dependency update command.""" if package_manager == PackageManager.NPM: return "npm update" if not dependencies else f"npm update {' '.join(dependencies)}" elif package_manager == PackageManager.PIP: return "pip install -U -r requirements.txt" if not dependencies else f"pip install -U {' '.join(dependencies)}" elif package_manager == PackageManager.CARGO: return "cargo update" if not dependencies else f"cargo update {' '.join(dependencies)}" else: raise PackageError(f"Unsupported package manager: {package_manager}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/test/manager.py: -------------------------------------------------------------------------------- ```python """Test system integration for MCP Development Server.""" import asyncio from typing import Dict, List, Optional, Any from enum import Enum from datetime import datetime from ..utils.errors import TestError from ..utils.logging import setup_logging logger = setup_logging(__name__) class TestStatus(str, Enum): """Test execution status.""" PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" ERROR = "error" class TestManager: """Manages test execution and reporting.""" def __init__(self, env_manager): self.env_manager = env_manager self.test_runs: Dict[str, Dict[str, Any]] = {} async def run_tests( self, environment: str, config: Dict[str, Any] ) -> str: """Start a test run.""" try: test_id = f"test_{len(self.test_runs)}" # Initialize test run self.test_runs[test_id] = { "environment": environment, "config": config, "status": TestStatus.PENDING, "results": [], "start_time": datetime.now(), "end_time": None } # Start test execution asyncio.create_task(self._execute_tests(test_id)) return test_id except Exception as e: raise TestError(f"Failed to start tests: {str(e)}") async def _execute_tests(self, test_id: str) -> None: """Execute test suite.""" try: test_run = self.test_runs[test_id] test_run["status"] = TestStatus.RUNNING # Run test command result = await self.env_manager.execute_in_environment( test_run["environment"], test_run["config"].get("command", "npm test"), workdir=test_run["config"].get("workdir") ) # Parse and store results test_run["results"] = self._parse_test_output( result["output"], test_run["config"].get("format", "jest") ) # Update test status test_run["end_time"] = datetime.now() test_run["status"] = ( TestStatus.SUCCESS if result["exit_code"] == 0 else TestStatus.FAILED ) except Exception as e: logger.error(f"Test execution error: {str(e)}") test_run["status"] = TestStatus.ERROR test_run["error"] = str(e) async def get_test_status(self, test_id: str) -> Dict[str, Any]: """Get status and results of a test run.""" if test_run := self.test_runs.get(test_id): return { "id": test_id, "status": test_run["status"], "results": test_run["results"], "start_time": test_run["start_time"], "end_time": test_run["end_time"], "error": test_run.get("error") } raise TestError(f"Test run not found: {test_id}") def _parse_test_output( self, output: str, format: str ) -> List[Dict[str, Any]]: """Parse test output into structured results.""" if format == "jest": return self._parse_jest_output(output) elif format == "pytest": return self._parse_pytest_output(output) else: logger.warning(f"Unknown test output format: {format}") return [{"raw_output": output}] def _parse_jest_output(self, output: str) -> List[Dict[str, Any]]: """Parse Jest test output.""" results = [] # Implement Jest output parsing return results def _parse_pytest_output(self, output: str) -> List[Dict[str, Any]]: """Parse pytest output.""" results = [] # Implement pytest output parsing return results ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/server.py: -------------------------------------------------------------------------------- ```python """MCP Development Server implementation.""" from typing import Dict, Any, Optional, Sequence import logging import sys import json # Import MCP components from mcp.server import Server as MCPServer from mcp.server.stdio import stdio_server import mcp.types as types from .models import Config, InputResponse, MCPDevServerError from .managers import ( ProjectManager, TemplateManager, BuildManager, DependencyManager, TestManager, WorkflowManager ) from .handlers import InputRequestHandler # Configure logging to stderr to keep stdout clean logger = logging.getLogger(__name__) handler = logging.StreamHandler(sys.stderr) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.DEBUG) # Set to DEBUG for development class MCPDevServer: """MCP Development Server implementation.""" def __init__(self): """Initialize the MCP Development Server.""" logger.info("Initializing MCP Development Server") try: # Initialize server self.server = MCPServer("mcp-dev-server") # Initialize configuration self.config = Config() # Initialize all managers self.project_manager = ProjectManager(self.config) self.template_manager = TemplateManager() self.build_manager = BuildManager() self.dependency_manager = DependencyManager() self.test_manager = TestManager() self.workflow_manager = WorkflowManager() self.input_handler = InputRequestHandler() # Setup request handlers self._setup_resource_handlers() self._setup_tool_handlers() self._setup_prompt_handlers() logger.info("Server initialization completed successfully") except Exception as e: logger.error(f"Failed to initialize server: {e}") raise def _setup_resource_handlers(self): """Set up resource request handlers.""" @self.server.list_resources() async def list_resources() -> list[types.Resource]: """List available resources.""" logger.debug("Listing resources") return [] @self.server.read_resource() async def read_resource(uri: str) -> str: """Read resource content.""" logger.debug(f"Reading resource: {uri}") return "" def _setup_tool_handlers(self): """Set up tool request handlers.""" @self.server.list_tools() async def list_tools() -> list[types.Tool]: """List available tools.""" logger.debug("Listing tools") return [] @self.server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> Sequence[types.TextContent]: """Execute a tool.""" logger.debug(f"Calling tool {name} with arguments {arguments}") return [types.TextContent(type="text", text="Tool execution result")] def _setup_prompt_handlers(self): """Set up prompt request handlers.""" @self.server.list_prompts() async def list_prompts() -> list[types.Prompt]: """List available prompts.""" logger.debug("Listing prompts") return [] async def run(self): """Run the MCP Development Server.""" try: logger.info(f"Starting {self.server.name}...") # Use stdio transport async with stdio_server() as streams: logger.info("Using stdio transport") await self.server.run( streams[0], # read stream streams[1], # write stream self.server.create_initialization_options(), raise_exceptions=True # Enable for debugging ) except Exception as e: logger.error(f"Server error: {str(e)}") raise MCPDevServerError(f"Server error: {str(e)}") finally: logger.info("Server shutdown") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/prompts/templates.py: -------------------------------------------------------------------------------- ```python """Input request templates for common scenarios.""" from typing import Dict from .input_protocol import InputRequest, InputField ENVIRONMENT_SETUP = InputRequest( request_id="environment_setup", title="Setup Development Environment", description="Configure your development environment", fields=[ InputField( name="language", type="select", description="Primary programming language", options=[ {"value": "python", "label": "Python"}, {"value": "node", "label": "Node.js"}, {"value": "both", "label": "Python & Node.js"} ] ), InputField( name="python_version", type="select", description="Python version", options=[ {"value": "3.12", "label": "Python 3.12"}, {"value": "3.11", "label": "Python 3.11"}, {"value": "3.10", "label": "Python 3.10"} ], required=False ), InputField( name="node_version", type="select", description="Node.js version", options=[ {"value": "20", "label": "Node.js 20 LTS"}, {"value": "18", "label": "Node.js 18 LTS"} ], required=False ), InputField( name="include_docker", type="confirm", description="Include Docker support?", default=False ) ] ) TEST_CONFIGURATION = InputRequest( request_id="test_configuration", title="Configure Test Environment", description="Set up testing parameters", fields=[ InputField( name="test_framework", type="select", description="Testing framework", options=[ {"value": "pytest", "label": "pytest"}, {"value": "unittest", "label": "unittest"}, {"value": "jest", "label": "Jest"}, {"value": "mocha", "label": "Mocha"} ] ), InputField( name="include_coverage", type="confirm", description="Include coverage reporting?", default=True ), InputField( name="parallel", type="confirm", description="Run tests in parallel?", default=False ), InputField( name="test_path", type="text", description="Test directory or file pattern", default="tests/", required=False ) ] ) DEPLOYMENT_CONFIG = InputRequest( request_id="deployment_config", title="Configure Deployment", description="Set up deployment parameters", fields=[ InputField( name="environment", type="select", description="Deployment environment", options=[ {"value": "development", "label": "Development"}, {"value": "staging", "label": "Staging"}, {"value": "production", "label": "Production"} ] ), InputField( name="deploy_method", type="select", description="Deployment method", options=[ {"value": "docker", "label": "Docker Container"}, {"value": "kubernetes", "label": "Kubernetes"}, {"value": "serverless", "label": "Serverless"} ] ), InputField( name="auto_deploy", type="confirm", description="Enable automatic deployment?", default=False ), InputField( name="rollback_enabled", type="confirm", description="Enable automatic rollback?", default=True ) ] ) DEBUG_CONFIG = InputRequest( request_id="debug_config", title="Configure Debugging Session", description="Set up debugging parameters", fields=[ InputField( name="debug_type", type="select", description="Type of debugging", options=[ {"value": "python", "label": "Python Debugger"}, {"value": "node", "label": "Node.js Debugger"}, {"value": "remote", "label": "Remote Debugging"} ] ), InputField( name="port", type="number", description="Debug port", default=9229, validation={"min": 1024, "max": 65535} ), InputField( name="break_on_entry", type="confirm", description="Break on entry point?", default=True ) ] ) TEMPLATE_REQUESTS: Dict[str, InputRequest] = { "environment_setup": ENVIRONMENT_SETUP, "test_configuration": TEST_CONFIGURATION, "deployment_config": DEPLOYMENT_CONFIG, "debug_config": DEBUG_CONFIG } ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/project_types.py: -------------------------------------------------------------------------------- ```python """Project type definitions and configurations.""" from typing import Dict, Any, List from enum import Enum class BuildSystem(str, Enum): """Build system types.""" MAVEN = "maven" GRADLE = "gradle" NPM = "npm" YARN = "yarn" PIP = "pip" POETRY = "poetry" DOTNET = "dotnet" CARGO = "cargo" GO = "go" SBT = "sbt" class ProjectType: """Base project type configuration.""" def __init__( self, name: str, description: str, file_structure: Dict[str, Any], build_systems: List[BuildSystem], default_build_system: BuildSystem, config_files: List[str], environment_variables: Dict[str, str], docker_templates: List[str], input_templates: List[str] ): self.name = name self.description = description self.file_structure = file_structure self.build_systems = build_systems self.default_build_system = default_build_system self.config_files = config_files self.environment_variables = environment_variables self.docker_templates = docker_templates self.input_templates = input_templates # Define standard project types JAVA_PROJECT = ProjectType( name="java", description="Java project", file_structure={ "src/": { "main/": { "java/": {}, "resources/": {} }, "test/": { "java/": {}, "resources/": {} } }, "target/": {}, }, build_systems=[BuildSystem.MAVEN, BuildSystem.GRADLE], default_build_system=BuildSystem.MAVEN, config_files=["pom.xml", "build.gradle", ".gitignore", "README.md"], environment_variables={ "JAVA_HOME": "", "MAVEN_HOME": "", "GRADLE_HOME": "" }, docker_templates=["java-maven", "java-gradle"], input_templates=["java_config", "maven_config", "gradle_config"] ) DOTNET_PROJECT = ProjectType( name="dotnet", description=".NET project", file_structure={ "src/": {}, "tests/": {}, "docs/": {} }, build_systems=[BuildSystem.DOTNET], default_build_system=BuildSystem.DOTNET, config_files=[".csproj", ".sln", "global.json", ".gitignore", "README.md"], environment_variables={ "DOTNET_ROOT": "", "ASPNETCORE_ENVIRONMENT": "Development" }, docker_templates=["dotnet-sdk", "dotnet-runtime"], input_templates=["dotnet_config", "aspnet_config"] ) NODE_PROJECT = ProjectType( name="node", description="Node.js project", file_structure={ "src/": {}, "tests/": {}, "dist/": {}, "public/": {} }, build_systems=[BuildSystem.NPM, BuildSystem.YARN], default_build_system=BuildSystem.NPM, config_files=["package.json", "tsconfig.json", ".gitignore", "README.md"], environment_variables={ "NODE_ENV": "development", "NPM_TOKEN": "" }, docker_templates=["node-dev", "node-prod"], input_templates=["node_config", "npm_config", "typescript_config"] ) PYTHON_PROJECT = ProjectType( name="python", description="Python project", file_structure={ "src/": {}, "tests/": {}, "docs/": {}, "notebooks/": {} }, build_systems=[BuildSystem.PIP, BuildSystem.POETRY], default_build_system=BuildSystem.POETRY, config_files=["pyproject.toml", "setup.py", "requirements.txt", ".gitignore", "README.md"], environment_variables={ "PYTHONPATH": "src", "PYTHON_ENV": "development" }, docker_templates=["python-dev", "python-prod"], input_templates=["python_config", "poetry_config", "pytest_config"] ) GOLANG_PROJECT = ProjectType( name="golang", description="Go project", file_structure={ "cmd/": {}, "internal/": {}, "pkg/": {}, "api/": {} }, build_systems=[BuildSystem.GO], default_build_system=BuildSystem.GO, config_files=["go.mod", "go.sum", ".gitignore", "README.md"], environment_variables={ "GOPATH": "", "GO111MODULE": "on" }, docker_templates=["golang-dev", "golang-prod"], input_templates=["golang_config", "go_mod_config"] ) RUST_PROJECT = ProjectType( name="rust", description="Rust project", file_structure={ "src/": {}, "tests/": {}, "benches/": {}, "examples/": {} }, build_systems=[BuildSystem.CARGO], default_build_system=BuildSystem.CARGO, config_files=["Cargo.toml", "Cargo.lock", ".gitignore", "README.md"], environment_variables={ "RUST_BACKTRACE": "1", "CARGO_HOME": "" }, docker_templates=["rust-dev", "rust-prod"], input_templates=["rust_config", "cargo_config"] ) # Map of all available project types PROJECT_TYPES: Dict[str, ProjectType] = { "java": JAVA_PROJECT, "dotnet": DOTNET_PROJECT, "node": NODE_PROJECT, "python": PYTHON_PROJECT, "golang": GOLANG_PROJECT, "rust": RUST_PROJECT } ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/git.py: -------------------------------------------------------------------------------- ```python """Git integration for MCP Development Server.""" import os from typing import List, Optional from git import Repo, GitCommandError from git.objects import Commit from ..utils.logging import setup_logging from ..utils.errors import GitError logger = setup_logging(__name__) class GitManager: """Manages Git operations for a project.""" def __init__(self, project_path: str): self.project_path = project_path self.repo: Optional[Repo] = None async def initialize(self) -> None: """Initialize Git repository.""" try: self.repo = Repo.init(self.project_path) # Create default .gitignore if it doesn't exist gitignore_path = os.path.join(self.project_path, '.gitignore') if not os.path.exists(gitignore_path): with open(gitignore_path, 'w') as f: f.write('\n'.join([ '# Python', '__pycache__/', '*.pyc', '*.pyo', '*.pyd', '.Python', 'env/', 'venv/', '.env', '.venv', '', '# IDE', '.idea/', '.vscode/', '*.swp', '*.swo', '', '# Project specific', '.mcp/', 'dist/', 'build/', '*.egg-info/', '' ])) # Initial commit if not self.repo.heads: self.repo.index.add(['.gitignore']) self.repo.index.commit("Initial commit") logger.info(f"Initialized Git repository at {self.project_path}") except Exception as e: raise GitError(f"Git initialization failed: {str(e)}") async def get_status(self) -> dict: """Get repository status.""" try: if not self.repo: raise GitError("Git repository not initialized") return { "branch": self.repo.active_branch.name, "changed_files": [item.a_path for item in self.repo.index.diff(None)], "untracked_files": self.repo.untracked_files, "is_dirty": self.repo.is_dirty(), "head_commit": { "hash": self.repo.head.commit.hexsha, "message": self.repo.head.commit.message, "author": str(self.repo.head.commit.author), "date": str(self.repo.head.commit.authored_datetime) } } except Exception as e: raise GitError(f"Failed to get Git status: {str(e)}") async def commit(self, message: str, files: Optional[List[str]] = None) -> str: """Create a new commit.""" try: if not self.repo: raise GitError("Git repository not initialized") # Add specified files or all changes if files: self.repo.index.add(files) else: self.repo.index.add('.') # Create commit commit = self.repo.index.commit(message) logger.info(f"Created commit: {commit.hexsha}") return commit.hexsha except Exception as e: raise GitError(f"Failed to create commit: {str(e)}") async def get_commit_history( self, max_count: Optional[int] = None ) -> List[dict]: """Get commit history.""" try: if not self.repo: raise GitError("Git repository not initialized") commits = [] for commit in self.repo.iter_commits(max_count=max_count): commits.append({ "hash": commit.hexsha, "message": commit.message, "author": str(commit.author), "date": str(commit.authored_datetime), "files": list(commit.stats.files.keys()) }) return commits except Exception as e: raise GitError(f"Failed to get commit history: {str(e)}") async def create_branch(self, name: str) -> None: """Create a new branch.""" try: if not self.repo: raise GitError("Git repository not initialized") self.repo.create_head(name) logger.info(f"Created branch: {name}") except Exception as e: raise GitError(f"Failed to create branch: {str(e)}") async def checkout(self, branch: str) -> None: """Checkout a branch.""" try: if not self.repo: raise GitError("Git repository not initialized") self.repo.git.checkout(branch) logger.info(f"Checked out branch: {branch}") except Exception as e: raise GitError(f"Failed to checkout branch: {str(e)}") async def get_diff( self, commit_a: Optional[str] = None, commit_b: Optional[str] = None ) -> str: """Get diff between commits or working directory.""" try: if not self.repo: raise GitError("Git repository not initialized") return self.repo.git.diff(commit_a, commit_b) except Exception as e: raise GitError(f"Failed to get diff: {str(e)}") async def cleanup(self) -> None: """Clean up Git resources.""" self.repo = None ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/environments/manager.py: -------------------------------------------------------------------------------- ```python """Environment management for MCP Development Server.""" import os import json from typing import Dict, List, Optional, Any from pathlib import Path from ..docker.manager import DockerManager from ..docker.volumes import VolumeManager from ..docker.templates import DockerTemplates from ..utils.logging import setup_logging from ..utils.errors import EnvironmentError logger = setup_logging(__name__) class EnvironmentManager: """Manages development environments.""" def __init__(self): self.docker_manager = DockerManager() self.volume_manager = VolumeManager() self.environments: Dict[str, Dict[str, Any]] = {} async def create_environment( self, name: str, project_path: str, env_type: str, config: Optional[Dict[str, Any]] = None ) -> str: """Create a new development environment.""" try: config = config or {} # Create environment directory env_path = os.path.join(project_path, '.mcp', 'environments', name) os.makedirs(env_path, exist_ok=True) # Generate Dockerfile dockerfile_content = DockerTemplates.get_template(env_type, config) dockerfile_path = os.path.join(env_path, 'Dockerfile') with open(dockerfile_path, 'w') as f: f.write(dockerfile_content) # Create volumes for persistence volumes = {} for volume_name in ['src', 'deps', 'cache']: volume = await self.volume_manager.create_volume( f"mcp-{name}-{volume_name}", labels={ 'mcp.environment': name, 'mcp.volume.type': volume_name } ) volumes[volume] = {'bind': f'/app/{volume_name}', 'mode': 'rw'} # Create container container_id = await self.docker_manager.create_container( project_path=project_path, environment=name, dockerfile=dockerfile_path, volumes=volumes, environment_vars=config.get('env_vars'), ports=config.get('ports') ) # Store environment configuration self.environments[name] = { 'id': container_id, 'type': env_type, 'path': env_path, 'config': config, 'volumes': volumes } # Save environment metadata self._save_environment_metadata(name) logger.info(f"Created environment: {name}") return container_id except Exception as e: raise EnvironmentError(f"Failed to create environment: {str(e)}") async def remove_environment(self, name: str) -> None: """Remove a development environment.""" try: if env := self.environments.get(name): # Stop container await self.docker_manager.stop_container(name) # Remove volumes for volume in env['volumes']: await self.volume_manager.remove_volume(volume) # Remove environment directory import shutil shutil.rmtree(env['path']) # Remove from environments dict del self.environments[name] logger.info(f"Removed environment: {name}") else: raise EnvironmentError(f"Environment not found: {name}") except Exception as e: raise EnvironmentError(f"Failed to remove environment: {str(e)}") async def execute_in_environment( self, name: str, command: str, workdir: Optional[str] = None ) -> Dict[str, Any]: """Execute a command in an environment.""" try: if name not in self.environments: raise EnvironmentError(f"Environment not found: {name}") return await self.docker_manager.execute_command( environment=name, command=command, workdir=workdir ) except Exception as e: raise EnvironmentError(f"Failed to execute command: {str(e)}") async def get_environment_status(self, name: str) -> Dict[str, Any]: """Get environment status including container and volumes.""" try: if env := self.environments.get(name): container_status = await self.docker_manager.get_container_status(name) volumes_status = {} for volume in env['volumes']: volumes_status[volume] = await self.volume_manager.get_volume_info(volume) return { 'container': container_status, 'volumes': volumes_status, 'type': env['type'], 'config': env['config'] } else: raise EnvironmentError(f"Environment not found: {name}") except Exception as e: raise EnvironmentError(f"Failed to get environment status: {str(e)}") def _save_environment_metadata(self, name: str) -> None: """Save environment metadata to disk.""" if env := self.environments.get(name): metadata_path = os.path.join(env['path'], 'metadata.json') with open(metadata_path, 'w') as f: json.dump({ 'name': name, 'type': env['type'], 'config': env['config'], 'volumes': list(env['volumes'].keys()) }, f, indent=2) async def cleanup(self) -> None: """Clean up all environments.""" for name in list(self.environments.keys()): try: await self.remove_environment(name) except Exception as e: logger.error(f"Error cleaning up environment {name}: {str(e)}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/prompts/project_templates.py: -------------------------------------------------------------------------------- ```python """Project-specific input templates.""" from typing import Dict from .input_protocol import InputRequest, InputField # Java Project Templates JAVA_CONFIG = InputRequest( request_id="java_config", title="Java Project Configuration", description="Configure Java project settings", fields=[ InputField( name="java_version", type="select", description="Java version", options=[ {"value": "21", "label": "Java 21 (LTS)"}, {"value": "17", "label": "Java 17 (LTS)"}, {"value": "11", "label": "Java 11 (LTS)"}, {"value": "8", "label": "Java 8"} ] ), InputField( name="project_type", type="select", description="Project type", options=[ {"value": "spring-boot", "label": "Spring Boot"}, {"value": "jakarta-ee", "label": "Jakarta EE"}, {"value": "android", "label": "Android"}, {"value": "library", "label": "Java Library"} ] ), InputField( name="packaging", type="select", description="Packaging type", options=[ {"value": "jar", "label": "JAR"}, {"value": "war", "label": "WAR"}, {"value": "ear", "label": "EAR"} ] ) ] ) # .NET Project Templates DOTNET_CONFIG = InputRequest( request_id="dotnet_config", title=".NET Project Configuration", description="Configure .NET project settings", fields=[ InputField( name="dotnet_version", type="select", description=".NET version", options=[ {"value": "8.0", "label": ".NET 8.0"}, {"value": "7.0", "label": ".NET 7.0"}, {"value": "6.0", "label": ".NET 6.0 (LTS)"} ] ), InputField( name="project_type", type="select", description="Project type", options=[ {"value": "webapi", "label": "ASP.NET Core Web API"}, {"value": "mvc", "label": "ASP.NET Core MVC"}, {"value": "blazor", "label": "Blazor"}, {"value": "maui", "label": ".NET MAUI"}, {"value": "library", "label": "Class Library"} ] ), InputField( name="authentication", type="select", description="Authentication type", options=[ {"value": "none", "label": "None"}, {"value": "individual", "label": "Individual Accounts"}, {"value": "microsoft", "label": "Microsoft Identity Platform"}, {"value": "windows", "label": "Windows Authentication"} ] ) ] ) # Node.js Project Templates NODE_CONFIG = InputRequest( request_id="node_config", title="Node.js Project Configuration", description="Configure Node.js project settings", fields=[ InputField( name="node_version", type="select", description="Node.js version", options=[ {"value": "20", "label": "Node.js 20 (LTS)"}, {"value": "18", "label": "Node.js 18 (LTS)"} ] ), InputField( name="project_type", type="select", description="Project type", options=[ {"value": "express", "label": "Express.js"}, {"value": "next", "label": "Next.js"}, {"value": "nest", "label": "NestJS"}, {"value": "library", "label": "NPM Package"} ] ), InputField( name="typescript", type="confirm", description="Use TypeScript?", default=True ) ] ) # Python Project Templates PYTHON_CONFIG = InputRequest( request_id="python_config", title="Python Project Configuration", description="Configure Python project settings", fields=[ InputField( name="python_version", type="select", description="Python version", options=[ {"value": "3.12", "label": "Python 3.12"}, {"value": "3.11", "label": "Python 3.11"}, {"value": "3.10", "label": "Python 3.10"} ] ), InputField( name="project_type", type="select", description="Project type", options=[ {"value": "fastapi", "label": "FastAPI"}, {"value": "django", "label": "Django"}, {"value": "flask", "label": "Flask"}, {"value": "library", "label": "Python Package"} ] ), InputField( name="dependency_management", type="select", description="Dependency management", options=[ {"value": "poetry", "label": "Poetry"}, {"value": "pip", "label": "pip + requirements.txt"}, {"value": "pipenv", "label": "Pipenv"} ] ) ] ) # Golang Project Templates GOLANG_CONFIG = InputRequest( request_id="golang_config", title="Go Project Configuration", description="Configure Go project settings", fields=[ InputField( name="go_version", type="select", description="Go version", options=[ {"value": "1.22", "label": "Go 1.22"}, {"value": "1.21", "label": "Go 1.21"}, {"value": "1.20", "label": "Go 1.20"} ] ), InputField( name="project_type", type="select", description="Project type", options=[ {"value": "gin", "label": "Gin Web Framework"}, {"value": "echo", "label": "Echo Framework"}, {"value": "cli", "label": "CLI Application"}, {"value": "library", "label": "Go Module"} ] ), InputField( name="module_path", type="text", description="Module path (e.g., github.com/user/repo)", validation={"pattern": r"^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+(/[a-zA-Z0-9_.-]+)?$"} ) ] ) # All project templates PROJECT_TEMPLATES: Dict[str, InputRequest] = { "java_config": JAVA_CONFIG, "dotnet_config": DOTNET_CONFIG, "node_config": NODE_CONFIG, "python_config": PYTHON_CONFIG, "golang_config": GOLANG_CONFIG } ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/templates.py: -------------------------------------------------------------------------------- ```python """Template system for project creation.""" import os import shutil from pathlib import Path from typing import Dict, Any, List import jinja2 import yaml from ..utils.logging import setup_logging from ..utils.errors import ProjectError logger = setup_logging(__name__) class TemplateManager: """Manages project templates.""" def __init__(self): """Initialize template manager.""" self.template_dir = self._get_template_dir() self.env = jinja2.Environment( loader=jinja2.FileSystemLoader(str(self.template_dir)), autoescape=jinja2.select_autoescape() ) def _get_template_dir(self) -> Path: """Get templates directory path.""" if os.name == "nt": # Windows template_dir = Path(os.getenv("APPDATA")) / "Claude" / "templates" else: # macOS/Linux template_dir = Path.home() / ".config" / "claude" / "templates" template_dir.mkdir(parents=True, exist_ok=True) # Initialize with basic template if empty if not any(template_dir.iterdir()): self._initialize_basic_template(template_dir) return template_dir def _initialize_basic_template(self, template_dir: Path): """Initialize basic project template. Args: template_dir: Templates directory path """ basic_dir = template_dir / "basic" basic_dir.mkdir(exist_ok=True) # Create template configuration config = { "name": "basic", "description": "Basic project template", "version": "1.0.0", "files": [ "README.md", "requirements.txt", ".gitignore", "src/__init__.py", "tests/__init__.py" ], "variables": { "project_name": "", "description": "" }, "features": { "git": True, "tests": True, "docker": False } } with open(basic_dir / "template.yaml", "w") as f: yaml.dump(config, f) # Create template files readme_content = """# {{ project_name }} {{ description }} ## Installation ```bash pip install -r requirements.txt ``` ## Usage ```python from {{ project_name.lower() }} import main ``` ## Testing ```bash pytest tests/ ``` """ with open(basic_dir / "README.md", "w") as f: f.write(readme_content) # Create source directory src_dir = basic_dir / "src" src_dir.mkdir(exist_ok=True) with open(src_dir / "__init__.py", "w") as f: f.write('"""{{ project_name }} package."""\n') # Create tests directory tests_dir = basic_dir / "tests" tests_dir.mkdir(exist_ok=True) with open(tests_dir / "__init__.py", "w") as f: f.write('"""Tests for {{ project_name }}."""\n') # Create requirements.txt with open(basic_dir / "requirements.txt", "w") as f: f.write("pytest>=7.0.0\n") # Create .gitignore gitignore_content = """__pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST """ with open(basic_dir / ".gitignore", "w") as f: f.write(gitignore_content) async def apply_template(self, template_name: str, project: Any) -> None: """Apply template to project. Args: template_name: Name of template to apply project: Project instance """ try: template_path = self.template_dir / template_name if not template_path.exists(): raise ProjectError(f"Template not found: {template_name}") # Load template configuration with open(template_path / "template.yaml", "r") as f: template_config = yaml.safe_load(f) # Prepare template variables variables = { "project_name": project.config.name, "description": project.config.description } # Process each template file for file_path in template_config["files"]: template_file = template_path / file_path if template_file.exists(): # Create target directory if needed target_path = Path(project.path) / file_path target_path.parent.mkdir(parents=True, exist_ok=True) # Render template content template = self.env.get_template(f"{template_name}/{file_path}") content = template.render(**variables) # Write rendered content with open(target_path, "w") as f: f.write(content) logger.info(f"Applied template {template_name} to project {project.config.name}") except Exception as e: logger.error(f"Failed to apply template: {str(e)}") raise ProjectError(f"Template application failed: {str(e)}") async def template_has_git(self, template_name: str) -> bool: """Check if template includes Git initialization. Args: template_name: Template name Returns: bool: True if template includes Git """ try: template_path = self.template_dir / template_name if not template_path.exists(): return False # Load template configuration with open(template_path / "template.yaml", "r") as f: template_config = yaml.safe_load(f) return template_config.get("features", {}).get("git", False) except Exception: return False def list_templates(self) -> List[Dict[str, Any]]: """Get list of available templates. Returns: List[Dict[str, Any]]: Template information """ templates = [] for template_dir in self.template_dir.iterdir(): if template_dir.is_dir(): config_path = template_dir / "template.yaml" if config_path.exists(): with open(config_path, "r") as f: config = yaml.safe_load(f) templates.append(config) return templates ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/context.py: -------------------------------------------------------------------------------- ```python """Project context management for MCP Development Server.""" import os import json import uuid from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from pydantic import BaseModel from ..utils.config import ProjectConfig from ..utils.logging import setup_logging from ..utils.errors import ProjectError, FileOperationError logger = setup_logging(__name__) class ProjectState(BaseModel): """Project state tracking.""" initialized: bool = False last_build_time: Optional[datetime] = None last_build_status: Optional[str] = None last_test_time: Optional[datetime] = None last_test_status: Optional[str] = None git_initialized: bool = False class ProjectContext: """Manages the context and state of a development project.""" def __init__(self, config: ProjectConfig): self.id = str(uuid.uuid4()) self.config = config self.path = config.path self.state = ProjectState() self._file_watchers: Dict[str, Any] = {} async def initialize(self) -> None: """Initialize project structure and state.""" try: # Create project directory os.makedirs(self.path, exist_ok=True) # Create project structure await self._create_project_structure() # Initialize state file await self._init_state_file() # Set up file watchers await self._setup_file_watchers() self.state.initialized = True logger.info(f"Initialized project {self.config.name} at {self.path}") except Exception as e: raise ProjectError(f"Project initialization failed: {str(e)}") async def _create_project_structure(self) -> None: """Create initial project directory structure.""" try: # Create standard directories for dir_name in ['.mcp', 'src', 'tests', 'docs']: os.makedirs(os.path.join(self.path, dir_name), exist_ok=True) # Create basic configuration files config_path = os.path.join(self.path, '.mcp', 'project.json') with open(config_path, 'w') as f: json.dump(self.config.dict(), f, indent=2, default=str) except Exception as e: raise FileOperationError(f"Failed to create project structure: {str(e)}") async def _init_state_file(self) -> None: """Initialize project state file.""" try: state_path = os.path.join(self.path, '.mcp', 'state.json') with open(state_path, 'w') as f: json.dump(self.state.dict(), f, indent=2, default=str) except Exception as e: raise FileOperationError(f"Failed to initialize state file: {str(e)}") async def _setup_file_watchers(self) -> None: """Set up file system watchers for project directories.""" # To be implemented with file watching functionality pass def get_structure(self) -> Dict[str, Any]: """Get project structure as a dictionary.""" structure = {"name": self.config.name, "type": "directory", "children": []} def scan_directory(path: Path, current_dict: Dict[str, Any]) -> None: try: for item in path.iterdir(): # Skip hidden files and .mcp directory if item.name.startswith('.'): continue if item.is_file(): current_dict["children"].append({ "name": item.name, "type": "file", "size": item.stat().st_size }) elif item.is_dir(): dir_dict = { "name": item.name, "type": "directory", "children": [] } current_dict["children"].append(dir_dict) scan_directory(item, dir_dict) except Exception as e: logger.error(f"Error scanning directory {path}: {str(e)}") scan_directory(Path(self.path), structure) return structure def get_file_content(self, relative_path: str) -> str: """Get content of a project file.""" try: file_path = os.path.join(self.path, relative_path) if not os.path.exists(file_path): raise FileOperationError(f"File not found: {relative_path}") # Basic security check if not os.path.normpath(file_path).startswith(str(self.path)): raise FileOperationError("Invalid file path") with open(file_path, 'r') as f: return f.read() except Exception as e: raise FileOperationError(f"Failed to read file {relative_path}: {str(e)}") async def update_file(self, relative_path: str, content: str) -> None: """Update content of a project file.""" try: file_path = os.path.join(self.path, relative_path) # Create directories if needed os.makedirs(os.path.dirname(file_path), exist_ok=True) # Security check if not os.path.normpath(file_path).startswith(str(self.path)): raise FileOperationError("Invalid file path") with open(file_path, 'w') as f: f.write(content) logger.info(f"Updated file: {relative_path}") except Exception as e: raise FileOperationError(f"Failed to update file {relative_path}: {str(e)}") async def delete_file(self, relative_path: str) -> None: """Delete a project file.""" try: file_path = os.path.join(self.path, relative_path) # Security check if not os.path.normpath(file_path).startswith(str(self.path)): raise FileOperationError("Invalid file path") if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted file: {relative_path}") else: logger.warning(f"File not found: {relative_path}") except Exception as e: raise FileOperationError(f"Failed to delete file {relative_path}: {str(e)}") async def update_state(self, **kwargs) -> None: """Update project state.""" try: # Update state object for key, value in kwargs.items(): if hasattr(self.state, key): setattr(self.state, key, value) # Save to state file state_path = os.path.join(self.path, '.mcp', 'state.json') with open(state_path, 'w') as f: json.dump(self.state.dict(), f, indent=2, default=str) logger.info(f"Updated project state: {kwargs}") except Exception as e: raise ProjectError(f"Failed to update project state: {str(e)}") async def cleanup(self) -> None: """Clean up project resources.""" try: # Stop file watchers for watcher in self._file_watchers.values(): await watcher.stop() logger.info(f"Cleaned up project resources for {self.config.name}") except Exception as e: logger.error(f"Error during project cleanup: {str(e)}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/manager.py: -------------------------------------------------------------------------------- ```python """Docker integration for MCP Development Server.""" import asyncio import docker from typing import Dict, Any, Optional, List from pathlib import Path import tempfile import yaml import jinja2 from ..utils.logging import setup_logging from ..utils.errors import MCPDevServerError logger = setup_logging(__name__) class DockerManager: """Manages Docker containers and environments.""" def __init__(self): """Initialize Docker manager.""" self.client = docker.from_env() self.active_containers: Dict[str, Any] = {} self._setup_template_environment() def _setup_template_environment(self): """Set up Jinja2 template environment.""" template_dir = Path(__file__).parent / "templates" self.template_env = jinja2.Environment( loader=jinja2.FileSystemLoader(str(template_dir)), autoescape=jinja2.select_autoescape() ) async def create_environment( self, name: str, image: str, project_path: str, env_vars: Optional[Dict[str, str]] = None, ports: Optional[Dict[str, str]] = None, volumes: Optional[Dict[str, Dict[str, str]]] = None ) -> str: """Create a new Docker environment. Args: name: Environment name image: Docker image name project_path: Project directory path env_vars: Environment variables ports: Port mappings volumes: Additional volume mappings Returns: str: Environment ID """ try: # Ensure image is available try: self.client.images.get(image) except docker.errors.ImageNotFound: logger.info(f"Pulling image: {image}") self.client.images.pull(image) # Setup default volumes container_volumes = { project_path: { "bind": "/workspace", "mode": "rw" } } if volumes: container_volumes.update(volumes) # Create container container = self.client.containers.run( image=image, name=f"mcp-env-{name}", detach=True, volumes=container_volumes, environment=env_vars or {}, ports=ports or {}, working_dir="/workspace", remove=True ) env_id = container.id self.active_containers[env_id] = { "name": name, "container": container, "status": "running" } logger.info(f"Created environment: {name} ({env_id})") return env_id except Exception as e: logger.error(f"Failed to create environment: {str(e)}") raise MCPDevServerError(f"Environment creation failed: {str(e)}") async def generate_dockerfile( self, template: str, variables: Dict[str, Any], output_path: Optional[str] = None ) -> str: """Generate Dockerfile from template. Args: template: Template name variables: Template variables output_path: Optional path to save Dockerfile Returns: str: Generated Dockerfile content """ try: template = self.template_env.get_template(f"{template}.dockerfile") content = template.render(**variables) if output_path: with open(output_path, "w") as f: f.write(content) return content except Exception as e: logger.error(f"Failed to generate Dockerfile: {str(e)}") raise MCPDevServerError(f"Dockerfile generation failed: {str(e)}") async def create_compose_config( self, name: str, services: Dict[str, Any], output_path: Optional[str] = None ) -> str: """Create Docker Compose configuration. Args: name: Project name services: Service configurations output_path: Optional path to save docker-compose.yml Returns: str: Generated docker-compose.yml content """ try: compose_config = { "version": "3.8", "services": services, "networks": { "mcp-network": { "driver": "bridge" } } } content = yaml.dump(compose_config, default_flow_style=False) if output_path: with open(output_path, "w") as f: f.write(content) return content except Exception as e: logger.error(f"Failed to create Docker Compose config: {str(e)}") raise MCPDevServerError(f"Compose config creation failed: {str(e)}") async def execute_command( self, env_id: str, command: str, workdir: Optional[str] = None, stream: bool = False ) -> Dict[str, Any]: """Execute command in Docker environment. Args: env_id: Environment ID command: Command to execute workdir: Working directory stream: Stream output in real-time Returns: Dict[str, Any]: Command execution results """ try: if env_id not in self.active_containers: raise MCPDevServerError(f"Environment not found: {env_id}") container = self.active_containers[env_id]["container"] exec_result = container.exec_run( command, workdir=workdir or "/workspace", stream=True ) if stream: output = [] for line in exec_result.output: decoded_line = line.decode().strip() output.append(decoded_line) yield decoded_line return { "exit_code": exec_result.exit_code, "output": output } else: output = [] for line in exec_result.output: output.append(line.decode().strip()) return { "exit_code": exec_result.exit_code, "output": output } except Exception as e: logger.error(f"Command execution failed: {str(e)}") raise MCPDevServerError(f"Command execution failed: {str(e)}") async def cleanup(self): """Clean up Docker resources.""" try: for env_id in list(self.active_containers.keys()): await self.destroy_environment(env_id) except Exception as e: logger.error(f"Docker cleanup failed: {str(e)}") raise MCPDevServerError(f"Docker cleanup failed: {str(e)}") def get_logs(self, env_id: str, tail: Optional[int] = None) -> str: """Get container logs. Args: env_id: Environment ID tail: Number of lines to return from the end Returns: str: Container logs """ try: if env_id not in self.active_containers: raise MCPDevServerError(f"Environment not found: {env_id}") container = self.active_containers[env_id]["container"] return container.logs(tail=tail).decode() except Exception as e: logger.error(f"Failed to get logs: {str(e)}") raise MCPDevServerError(f"Log retrieval failed: {str(e)}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/project.py: -------------------------------------------------------------------------------- ```python """Project representation and management.""" import uuid from typing import Dict, Any, Optional, List from pathlib import Path import git from pydantic import BaseModel class ProjectConfig(BaseModel): """Project configuration model.""" name: str template: str description: str = "" version: str = "0.1.0" class ProjectState: """Project state tracking.""" def __init__(self): """Initialize project state.""" self.git_initialized: bool = False self.last_build: Optional[Dict[str, Any]] = None self.last_test_run: Optional[Dict[str, Any]] = None self.active_environments: List[str] = [] class Project: """Project instance representation.""" def __init__(self, path: str, config: ProjectConfig, state: ProjectState): """Initialize project instance. Args: path: Project directory path config: Project configuration state: Project state """ self.id = str(uuid.uuid4()) self.path = path self.config = config self.state = state def get_structure(self) -> Dict[str, Any]: """Get project directory structure. Returns: Dict[str, Any]: Directory structure """ def scan_dir(path: Path) -> Dict[str, Any]: structure = {} for item in path.iterdir(): if item.name.startswith("."): continue if item.is_file(): structure[item.name] = "file" elif item.is_dir(): structure[item.name] = scan_dir(item) return structure return scan_dir(Path(self.path)) def get_git_status(self) -> Dict[str, Any]: """Get Git repository status. Returns: Dict[str, Any]: Git status information """ if not self.state.git_initialized: return {"initialized": False} try: repo = git.Repo(self.path) return { "initialized": True, "branch": repo.active_branch.name, "changed_files": [item.a_path for item in repo.index.diff(None)], "untracked_files": repo.untracked_files, "ahead": sum(1 for c in repo.iter_commits("origin/main..main")), "behind": sum(1 for c in repo.iter_commits("main..origin/main")) } except Exception as e: return { "initialized": False, "error": str(e) } async def create_git_commit(self, message: str, files: Optional[List[str]] = None) -> Dict[str, Any]: """Create a Git commit. Args: message: Commit message files: Optional list of files to commit Returns: Dict[str, Any]: Commit information """ if not self.state.git_initialized: raise ValueError("Git is not initialized for this project") try: repo = git.Repo(self.path) if files: repo.index.add(files) else: repo.index.add("*") commit = repo.index.commit(message) return { "commit_id": commit.hexsha, "message": message, "author": str(commit.author), "files": [item.a_path for item in commit.stats.files] } except Exception as e: raise ValueError(f"Failed to create commit: {str(e)}") def get_dependencies(self) -> Dict[str, Any]: """Get project dependencies. Returns: Dict[str, Any]: Dependency information """ dependencies = {} # Check Python dependencies req_file = Path(self.path) / "requirements.txt" if req_file.exists(): with open(req_file, "r") as f: dependencies["python"] = f.read().splitlines() # Check Node.js dependencies package_file = Path(self.path) / "package.json" if package_file.exists(): import json with open(package_file, "r") as f: package_data = json.load(f) dependencies["node"] = { "dependencies": package_data.get("dependencies", {}), "devDependencies": package_data.get("devDependencies", {}) } return dependencies def analyze_code(self) -> Dict[str, Any]: """Analyze project code. Returns: Dict[str, Any]: Code analysis results """ analysis = { "files": {}, "summary": { "total_files": 0, "total_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0 } } def analyze_file(path: Path) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as f: lines = f.readlines() total_lines = len(lines) blank_lines = sum(1 for line in lines if not line.strip()) comment_lines = sum(1 for line in lines if line.strip().startswith("#")) code_lines = total_lines - blank_lines - comment_lines return { "total_lines": total_lines, "code_lines": code_lines, "comment_lines": comment_lines, "blank_lines": blank_lines } for root, _, files in os.walk(self.path): for file in files: if file.endswith(".py"): file_path = Path(root) / file try: file_analysis = analyze_file(file_path) relative_path = str(file_path.relative_to(self.path)) analysis["files"][relative_path] = file_analysis # Update summary for key in ["total_lines", "code_lines", "comment_lines", "blank_lines"]: analysis["summary"][key] += file_analysis[key] analysis["summary"]["total_files"] += 1 except Exception: continue return analysis def get_test_coverage(self) -> Dict[str, Any]: """Get test coverage information. Returns: Dict[str, Any]: Test coverage data """ try: import coverage cov = coverage.Coverage() cov.load() return { "total_coverage": cov.report(), "missing_lines": dict(cov.analysis2()), "branch_coverage": cov.get_option("branch"), "excluded_lines": cov.get_exclude_list() } except Exception: return { "error": "Coverage data not available" } def get_ci_config(self) -> Dict[str, Any]: """Get CI configuration. Returns: Dict[str, Any]: CI configuration data """ ci_configs = {} # Check GitHub Actions github_dir = Path(self.path) / ".github" / "workflows" if github_dir.exists(): ci_configs["github_actions"] = [] for workflow in github_dir.glob("*.yml"): with open(workflow, "r") as f: ci_configs["github_actions"].append({ "name": workflow.stem, "config": f.read() }) # Check GitLab CI gitlab_file = Path(self.path) / ".gitlab-ci.yml" if gitlab_file.exists(): with open(gitlab_file, "r") as f: ci_configs["gitlab"] = f.read() return ci_configs async def cleanup(self): """Clean up project resources.""" # Implementation will depend on what resources need cleanup pass ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/core/server.py: -------------------------------------------------------------------------------- ```python import asyncio import json import websockets from typing import Callable, Any, Dict, Optional import logging import traceback logger = logging.getLogger(__name__) class Server: """Core server class implementing JSON-RPC 2.0 protocol.""" def __init__(self, name: str): """Initialize the server. Args: name: Server name """ self.name = name self.websocket = None self.input_request_handlers = {} self.input_response_handlers = {} self.initialized = False self.capabilities = {} async def start(self, host: str = "localhost", port: int = 8000): """Start the WebSocket server. Args: host: Host to bind to port: Port to listen on """ async def handler(websocket, path): self.websocket = websocket try: logger.info(f"New WebSocket connection from {websocket.remote_address}") async for message in websocket: response = None try: # Parse JSON-RPC message data = json.loads(message) if not isinstance(data, dict): raise ValueError("Invalid JSON-RPC message") # Handle message response = await self.handle_jsonrpc(data) except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}") response = { "jsonrpc": "2.0", "error": { "code": -32700, "message": "Parse error", "data": str(e) }, "id": None } except Exception as e: logger.error(f"Error handling message: {str(e)}", exc_info=True) response = { "jsonrpc": "2.0", "error": { "code": -32603, "message": "Internal error", "data": { "error": str(e), "traceback": traceback.format_exc() } }, "id": getattr(data, "id", None) if isinstance(data, dict) else None } # Ensure we always send a properly formatted JSON-RPC response if response: try: if not isinstance(response, dict): response = {"result": response} response["jsonrpc"] = "2.0" if isinstance(data, dict) and "id" in data: response["id"] = data["id"] # Validate JSON before sending response_str = json.dumps(response) await websocket.send(response_str) except Exception as e: logger.error(f"Error sending response: {str(e)}", exc_info=True) error_response = { "jsonrpc": "2.0", "error": { "code": -32603, "message": "Error sending response", "data": str(e) }, "id": data.get("id") if isinstance(data, dict) else None } await websocket.send(json.dumps(error_response)) except websockets.exceptions.ConnectionClosed: logger.info("WebSocket connection closed") finally: self.websocket = None try: self.server = await websockets.serve( handler, host, port, ping_interval=20, ping_timeout=20 ) logger.info(f"Server started on ws://{host}:{port}") except Exception as e: logger.error(f"Failed to start server: {str(e)}", exc_info=True) raise async def handle_jsonrpc(self, data: Dict) -> Optional[Dict]: """Handle JSON-RPC message. Args: data: Parsed JSON-RPC message Returns: Optional response message """ try: method = data.get("method") params = data.get("params", {}) logger.info(f"Handling method: {method} with params: {params}") if method == "initialize": self.capabilities = params.get("capabilities", {}) self.initialized = True return { "result": { "capabilities": self.capabilities } } if not self.initialized: return { "error": { "code": -32002, "message": "Server not initialized" } } if method == "input/request": handler = self.input_request_handlers.get("input_request") if handler: try: result = await handler( params.get("type", ""), params.get("context", {}) ) return {"result": result} except Exception as e: logger.error(f"Error in input request handler: {str(e)}", exc_info=True) return { "error": { "code": -32000, "message": str(e), "data": { "traceback": traceback.format_exc() } } } elif method == "input/response": handler = self.input_response_handlers.get("input_response") if handler: try: await handler(params) return {"result": None} except Exception as e: logger.error(f"Error in input response handler: {str(e)}", exc_info=True) return { "error": { "code": -32000, "message": str(e), "data": { "traceback": traceback.format_exc() } } } return { "error": { "code": -32601, "message": f"Method not found: {method}" } } except Exception as e: logger.error(f"Error in handle_jsonrpc: {str(e)}", exc_info=True) return { "error": { "code": -32603, "message": "Internal error", "data": { "error": str(e), "traceback": traceback.format_exc() } } } def request_input(self) -> Callable: """Decorator for input request handlers.""" def decorator(func: Callable) -> Callable: self.input_request_handlers["input_request"] = func return func return decorator def handle_input(self) -> Callable: """Decorator for input response handlers.""" def decorator(func: Callable) -> Callable: self.input_response_handlers["input_response"] = func return func return decorator ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/base_project.py: -------------------------------------------------------------------------------- ```python """Base project class definition.""" import os import uuid import xml.etree.ElementTree as ET import json import tomli from pathlib import Path from typing import Dict, Any, Optional, List import git from .project_types import ProjectType, BuildSystem from ..utils.errors import ProjectError from ..utils.logging import setup_logging logger = setup_logging(__name__) class Project: """Base project class.""" def __init__(self, path: str, config: Dict[str, Any], project_type: ProjectType): """Initialize project instance.""" self.id = str(uuid.uuid4()) self.path = path self.config = config self.project_type = project_type self.build_system = BuildSystem(config["build_system"]) def get_dependencies(self) -> Dict[str, Any]: """Get project dependencies.""" if self.build_system == BuildSystem.MAVEN: return self._get_maven_dependencies() elif self.build_system == BuildSystem.GRADLE: return self._get_gradle_dependencies() elif self.build_system in [BuildSystem.NPM, BuildSystem.YARN]: return self._get_node_dependencies() elif self.build_system == BuildSystem.POETRY: return self._get_poetry_dependencies() elif self.build_system == BuildSystem.DOTNET: return self._get_dotnet_dependencies() elif self.build_system == BuildSystem.GO: return self._get_go_dependencies() else: return {} def _get_maven_dependencies(self) -> Dict[str, Any]: """Get Maven project dependencies.""" pom_path = Path(self.path) / "pom.xml" if not pom_path.exists(): return {} try: tree = ET.parse(pom_path) root = tree.getroot() ns = {'maven': 'http://maven.apache.org/POM/4.0.0'} dependencies = [] for dep in root.findall('.//maven:dependency', ns): dependencies.append({ 'groupId': dep.find('maven:groupId', ns).text, 'artifactId': dep.find('maven:artifactId', ns).text, 'version': dep.find('maven:version', ns).text if dep.find('maven:version', ns) is not None else None, 'scope': dep.find('maven:scope', ns).text if dep.find('maven:scope', ns) is not None else 'compile' }) return {'maven': dependencies} except Exception as e: logger.error(f"Error parsing Maven dependencies: {e}") return {} def _get_node_dependencies(self) -> Dict[str, Any]: """Get Node.js project dependencies.""" package_path = Path(self.path) / "package.json" if not package_path.exists(): return {} try: with open(package_path) as f: package_data = json.load(f) return { 'dependencies': package_data.get('dependencies', {}), 'devDependencies': package_data.get('devDependencies', {}) } except Exception as e: logger.error(f"Error parsing Node.js dependencies: {e}") return {} def _get_poetry_dependencies(self) -> Dict[str, Any]: """Get Poetry project dependencies.""" pyproject_path = Path(self.path) / "pyproject.toml" if not pyproject_path.exists(): return {} try: with open(pyproject_path, "rb") as f: pyproject_data = tomli.load(f) tool_poetry = pyproject_data.get('tool', {}).get('poetry', {}) return { 'dependencies': tool_poetry.get('dependencies', {}), 'dev-dependencies': tool_poetry.get('dev-dependencies', {}) } except Exception as e: logger.error(f"Error parsing Poetry dependencies: {e}") return {} def _get_dotnet_dependencies(self) -> Dict[str, Any]: """Get .NET project dependencies.""" try: # Find all .csproj files csproj_files = list(Path(self.path).glob("**/*.csproj")) dependencies = {} for csproj in csproj_files: tree = ET.parse(csproj) root = tree.getroot() project_deps = [] for item_group in root.findall('.//PackageReference'): project_deps.append({ 'Include': item_group.get('Include'), 'Version': item_group.get('Version') }) dependencies[csproj.stem] = project_deps return dependencies except Exception as e: logger.error(f"Error parsing .NET dependencies: {e}") return {} def _get_go_dependencies(self) -> Dict[str, Any]: """Get Go project dependencies.""" go_mod_path = Path(self.path) / "go.mod" if not go_mod_path.exists(): return {} try: result = subprocess.run( ['go', 'list', '-m', 'all'], capture_output=True, text=True, cwd=self.path ) if result.returncode == 0: dependencies = [] for line in result.stdout.splitlines()[1:]: # Skip first line (module name) parts = line.split() if len(parts) >= 2: dependencies.append({ 'module': parts[0], 'version': parts[1] }) return {'modules': dependencies} except Exception as e: logger.error(f"Error parsing Go dependencies: {e}") return {} async def update_dependencies(self, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Update project dependencies.""" if self.build_system == BuildSystem.MAVEN: cmd = "mvn versions:use-latest-versions" elif self.build_system == BuildSystem.GRADLE: cmd = "./gradlew dependencyUpdates" elif self.build_system == BuildSystem.NPM: cmd = "npm update" elif self.build_system == BuildSystem.YARN: cmd = "yarn upgrade" elif self.build_system == BuildSystem.POETRY: cmd = "poetry update" elif self.build_system == BuildSystem.DOTNET: cmd = "dotnet restore" else: raise ProjectError(f"Dependency updates not supported for {self.build_system}") return await self.execute_command(cmd) async def get_project_analysis(self) -> Dict[str, Any]: """Get project analysis results.""" analysis = { "structure": self.get_structure(), "dependencies": self.get_dependencies(), "metadata": { "name": self.config["name"], "type": self.project_type.name, "build_system": self.build_system.value, "config": self.config } } # Add Git information if available git_info = self.get_git_status() if git_info.get("initialized", False): analysis["git"] = git_info # Add build/test status if available if hasattr(self, 'last_build'): analysis["last_build"] = self.last_build if hasattr(self, 'last_test_run'): analysis["last_test_run"] = self.last_test_run return analysis def get_structure(self) -> Dict[str, Any]: """Get project structure.""" def scan_dir(path: Path) -> Dict[str, Any]: structure = {} ignore_patterns = ['.git', '__pycache__', 'node_modules', 'target', 'build'] for item in path.iterdir(): if item.name in ignore_patterns: continue if item.is_file(): structure[item.name] = { "type": "file", "size": item.stat().st_size } elif item.is_dir(): structure[item.name] = { "type": "directory", "contents": scan_dir(item) } return structure return scan_dir(Path(self.path)) async def cleanup(self): """Clean up project resources.""" try: # Clean build artifacts if self.build_system == BuildSystem.MAVEN: await self.execute_command("mvn clean") elif self.build_system == BuildSystem.GRADLE: await self.execute_command("./gradlew clean") elif self.build_system == BuildSystem.NPM: await self.execute_command("npm run clean") logger.info(f"Cleaned up project: {self.config['name']}") except Exception as e: logger.error(f"Project cleanup failed: {e}") raise ProjectError(f"Cleanup failed: {str(e)}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/environments/workflow.py: -------------------------------------------------------------------------------- ```python """Development workflow management for environments.""" from typing import Dict, List, Optional, Any, Callable from enum import Enum import asyncio from ..utils.logging import setup_logging from ..utils.errors import WorkflowError logger = setup_logging(__name__) class TaskStatus(str, Enum): """Workflow task status.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" class Task: """Represents a workflow task.""" def __init__( self, name: str, command: str, environment: str, dependencies: Optional[List[str]] = None, timeout: Optional[int] = None, retry_count: int = 0, on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None ): self.name = name self.command = command self.environment = environment self.dependencies = dependencies or [] self.timeout = timeout self.retry_count = retry_count self.status = TaskStatus.PENDING self.result: Optional[Dict[str, Any]] = None self.on_success = on_success self.on_failure = on_failure self.attempts = 0 class Workflow: """Manages development workflows.""" def __init__(self, env_manager): self.env_manager = env_manager self.tasks: Dict[str, Task] = {} self.running = False def add_task(self, task: Task) -> None: """Add a task to the workflow.""" self.tasks[task.name] = task def remove_task(self, task_name: str) -> None: """Remove a task from the workflow.""" if task_name in self.tasks: del self.tasks[task_name] async def execute(self) -> Dict[str, Any]: """Execute the workflow.""" try: self.running = True results = {} # Build dependency graph graph = self._build_dependency_graph() # Execute tasks in order for task_group in graph: # Execute tasks in group concurrently tasks = [self._execute_task(task_name) for task_name in task_group] group_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for task_name, result in zip(task_group, group_results): if isinstance(result, Exception): self.tasks[task_name].status = TaskStatus.FAILED results[task_name] = { "status": TaskStatus.FAILED, "error": str(result) } else: results[task_name] = result return results except Exception as e: raise WorkflowError(f"Workflow execution failed: {str(e)}") finally: self.running = False async def _execute_task(self, task_name: str) -> Dict[str, Any]: """Execute a single task.""" task = self.tasks[task_name] # Check dependencies for dep in task.dependencies: dep_task = self.tasks.get(dep) if not dep_task or dep_task.status != TaskStatus.COMPLETED: task.status = TaskStatus.SKIPPED return { "status": TaskStatus.SKIPPED, "reason": f"Dependency {dep} not satisfied" } task.status = TaskStatus.RUNNING task.attempts += 1 try: # Execute the command result = await asyncio.wait_for( self.env_manager.execute_in_environment( task.environment, task.command ), timeout=task.timeout ) # Handle execution result if result['exit_code'] == 0: task.status = TaskStatus.COMPLETED if task.on_success: await task.on_success(result) return { "status": TaskStatus.COMPLETED, "result": result } else: # Handle retry logic if task.attempts < task.retry_count + 1: logger.info(f"Retrying task {task_name} (attempt {task.attempts})") return await self._execute_task(task_name) task.status = TaskStatus.FAILED if task.on_failure: await task.on_failure(result) return { "status": TaskStatus.FAILED, "result": result } except asyncio.TimeoutError: task.status = TaskStatus.FAILED return { "status": TaskStatus.FAILED, "error": "Task timeout" } except Exception as e: task.status = TaskStatus.FAILED return { "status": TaskStatus.FAILED, "error": str(e) } def _build_dependency_graph(self) -> List[List[str]]: """Build ordered list of task groups based on dependencies.""" # Initialize variables graph: List[List[str]] = [] completed = set() remaining = set(self.tasks.keys()) while remaining: # Find tasks with satisfied dependencies group = set() for task_name in remaining: task = self.tasks[task_name] if all(dep in completed for dep in task.dependencies): group.add(task_name) if not group: # Circular dependency detected raise WorkflowError("Circular dependency detected in workflow") # Add group to graph graph.append(list(group)) completed.update(group) remaining.difference_update(group) return graph def get_status(self) -> Dict[str, Any]: """Get workflow status.""" return { "running": self.running, "tasks": { name: { "status": task.status, "attempts": task.attempts, "dependencies": task.dependencies } for name, task in self.tasks.items() } } def reset(self) -> None: """Reset workflow state.""" for task in self.tasks.values(): task.status = TaskStatus.PENDING task.attempts = 0 task.result = None self.running = False # Example workflow definitions for common development tasks class CommonWorkflows: """Predefined development workflows.""" @staticmethod def create_build_workflow(env_manager, environment: str) -> Workflow: """Create a standard build workflow.""" workflow = Workflow(env_manager) # Install dependencies workflow.add_task(Task( name="install_deps", command="npm install", environment=environment, retry_count=2 )) # Run linter workflow.add_task(Task( name="lint", command="npm run lint", environment=environment, dependencies=["install_deps"] )) # Run tests workflow.add_task(Task( name="test", command="npm run test", environment=environment, dependencies=["install_deps"] )) # Build workflow.add_task(Task( name="build", command="npm run build", environment=environment, dependencies=["lint", "test"] )) return workflow @staticmethod def create_test_workflow(env_manager, environment: str) -> Workflow: """Create a standard test workflow.""" workflow = Workflow(env_manager) # Install test dependencies workflow.add_task(Task( name="install_test_deps", command="npm install --only=dev", environment=environment, retry_count=2 )) # Run unit tests workflow.add_task(Task( name="unit_tests", command="npm run test:unit", environment=environment, dependencies=["install_test_deps"] )) # Run integration tests workflow.add_task(Task( name="integration_tests", command="npm run test:integration", environment=environment, dependencies=["install_test_deps"] )) # Generate coverage report workflow.add_task(Task( name="coverage", command="npm run coverage", environment=environment, dependencies=["unit_tests", "integration_tests"] )) return workflow ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/environments/tools.py: -------------------------------------------------------------------------------- ```python """Development tools integration for environments.""" import shutil import subprocess from typing import Dict, Optional, Any from pathlib import Path from ..utils.logging import setup_logging from ..utils.errors import ToolError logger = setup_logging(__name__) class ToolManager: """Manages development tools in environments.""" def __init__(self, env_manager): self.env_manager = env_manager async def setup_package_manager( self, environment: str, package_manager: str, config: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Set up package manager in an environment.""" try: config = config or {} if package_manager == "npm": return await self._setup_npm(environment, config) elif package_manager == "pip": return await self._setup_pip(environment, config) else: raise ToolError(f"Unsupported package manager: {package_manager}") except Exception as e: raise ToolError(f"Failed to setup package manager: {str(e)}") async def setup_build_tool( self, environment: str, build_tool: str, config: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Set up build tool in an environment.""" try: config = config or {} if build_tool == "webpack": return await self._setup_webpack(environment, config) elif build_tool == "vite": return await self._setup_vite(environment, config) else: raise ToolError(f"Unsupported build tool: {build_tool}") except Exception as e: raise ToolError(f"Failed to setup build tool: {str(e)}") async def setup_test_framework( self, environment: str, test_framework: str, config: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Set up testing framework in an environment.""" try: config = config or {} if test_framework == "jest": return await self._setup_jest(environment, config) elif test_framework == "pytest": return await self._setup_pytest(environment, config) else: raise ToolError(f"Unsupported test framework: {test_framework}") except Exception as e: raise ToolError(f"Failed to setup test framework: {str(e)}") async def _setup_npm(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]: """Set up NPM package manager.""" try: # Initialize package.json if needed if not config.get('skip_init'): result = await self.env_manager.execute_in_environment( environment, 'npm init -y' ) if result['exit_code'] != 0: raise ToolError(f"npm init failed: {result['error']}") # Install dependencies if specified if deps := config.get('dependencies'): deps_str = ' '.join(deps) result = await self.env_manager.execute_in_environment( environment, f'npm install {deps_str}' ) if result['exit_code'] != 0: raise ToolError(f"npm install failed: {result['error']}") return {"status": "success"} except Exception as e: raise ToolError(f"NPM setup failed: {str(e)}") async def _setup_pip(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]: """Set up Pip package manager.""" try: # Create virtual environment if needed if not config.get('skip_venv'): result = await self.env_manager.execute_in_environment( environment, 'python -m venv .venv' ) if result['exit_code'] != 0: raise ToolError(f"venv creation failed: {result['error']}") # Install dependencies if specified if deps := config.get('dependencies'): deps_str = ' '.join(deps) result = await self.env_manager.execute_in_environment( environment, f'pip install {deps_str}' ) if result['exit_code'] != 0: raise ToolError(f"pip install failed: {result['error']}") return {"status": "success"} except Exception as e: raise ToolError(f"Pip setup failed: {str(e)}") async def _setup_webpack(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]: """Set up Webpack build tool.""" try: # Install webpack and dependencies result = await self.env_manager.execute_in_environment( environment, 'npm install webpack webpack-cli --save-dev' ) if result['exit_code'] != 0: raise ToolError(f"webpack installation failed: {result['error']}") # Create webpack config if not exists config_content = """ const path = require('path'); module.exports = { entry: './src/index.js', output: { path: path.resolve(__dirname, 'dist'), filename: 'bundle.js' } }; """ config_path = Path(self.env_manager.environments[environment]['path']) / 'webpack.config.js' config_path.write_text(config_content) return {"status": "success"} except Exception as e: raise ToolError(f"Webpack setup failed: {str(e)}") async def _setup_vite(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]: """Set up Vite build tool.""" try: # Install vite result = await self.env_manager.execute_in_environment( environment, 'npm install vite --save-dev' ) if result['exit_code'] != 0: raise ToolError(f"vite installation failed: {result['error']}") # Create vite config if not exists config_content = """ export default { root: 'src', build: { outDir: '../dist' } } """ config_path = Path(self.env_manager.environments[environment]['path']) / 'vite.config.js' config_path.write_text(config_content) return {"status": "success"} except Exception as e: raise ToolError(f"Vite setup failed: {str(e)}") async def _setup_jest(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]: """Set up Jest testing framework.""" try: # Install jest and dependencies result = await self.env_manager.execute_in_environment( environment, 'npm install jest @types/jest --save-dev' ) if result['exit_code'] != 0: raise ToolError(f"jest installation failed: {result['error']}") # Create jest config if not exists config_content = """ module.exports = { testEnvironment: 'node', testMatch: ['**/*.test.js'], collectCoverage: true }; """ config_path = Path(self.env_manager.environments[environment]['path']) / 'jest.config.js' config_path.write_text(config_content) return {"status": "success"} except Exception as e: raise ToolError(f"Jest setup failed: {str(e)}") async def _setup_pytest(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]: """Set up Pytest testing framework.""" try: # Install pytest and dependencies result = await self.env_manager.execute_in_environment( environment, 'pip install pytest pytest-cov' ) if result['exit_code'] != 0: raise ToolError(f"pytest installation failed: {result['error']}") # Create pytest config if not exists config_content = """ [pytest] testpaths = tests python_files = test_*.py addopts = --cov=src """ config_path = Path(self.env_manager.environments[environment]['path']) / 'pytest.ini' config_path.write_text(config_content) return {"status": "success"} except Exception as e: raise ToolError(f"Pytest setup failed: {str(e)}") ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/project_manager/manager.py: -------------------------------------------------------------------------------- ```python """Project management system for MCP Development Server.""" import asyncio import json from pathlib import Path from typing import Dict, Any, Optional, List import git from .project_types import PROJECT_TYPES, ProjectType, BuildSystem from .templates import TemplateManager from ..prompts.project_templates import PROJECT_TEMPLATES from ..utils.logging import setup_logging from ..utils.errors import ProjectError from ..docker.manager import DockerManager logger = setup_logging(__name__) class ProjectManager: """Manages development projects.""" def __init__(self, config): """Initialize project manager. Args: config: Server configuration instance """ self.config = config self.template_manager = TemplateManager() self.docker_manager = DockerManager() self.current_project = None self.projects = {} def get_available_project_types(self) -> Dict[str, Dict[str, Any]]: """Get list of available project types. Returns: Dict[str, Dict[str, Any]]: Project type information """ return { name: { "name": pt.name, "description": pt.description, "build_systems": [bs.value for bs in pt.build_systems], "default_build_system": pt.default_build_system.value } for name, pt in PROJECT_TYPES.items() } async def create_project( self, name: str, project_type: str, project_config: Dict[str, Any], path: Optional[str] = None, description: str = "" ) -> Any: """Create a new project. Args: name: Project name project_type: Type of project (e.g., java, dotnet, node) project_config: Project-specific configuration path: Project directory path (optional) description: Project description Returns: Project instance """ try: if project_type not in PROJECT_TYPES: raise ProjectError(f"Unsupported project type: {project_type}") project_type_info = PROJECT_TYPES[project_type] # Determine project path if not path: projects_dir = Path(self.config.get("projectsDir")) path = str(projects_dir / name) project_path = Path(path) if project_path.exists(): raise ProjectError(f"Project path already exists: {path}") # Create project directory project_path.mkdir(parents=True, exist_ok=True) # Create project configuration project_config.update({ "name": name, "type": project_type, "description": description, "build_system": project_config.get("build_system", project_type_info.default_build_system.value) }) # Save project configuration config_path = project_path / "project.json" with open(config_path, "w") as f: json.dump(project_config, f, indent=2) # Create project structure await self._create_project_structure(project_path, project_type_info) # Initialize build system await self._initialize_build_system( project_path, project_type_info, project_config ) # Set up Docker environment if requested if project_config.get("setup_docker", False): await self._setup_docker_environment( project_path, project_type_info, project_config ) # Initialize Git repository if requested if project_config.get("initialize_git", True): repo = git.Repo.init(path) repo.index.add("*") repo.index.commit("Initial commit") # Create project instance project = await self._create_project_instance( path, project_config, project_type_info ) # Store project reference self.projects[project.id] = project self.current_project = project logger.info(f"Created {project_type} project: {name} at {path}") return project except Exception as e: logger.error(f"Failed to create project: {str(e)}") raise ProjectError(f"Project creation failed: {str(e)}") async def _create_project_structure( self, project_path: Path, project_type: ProjectType ): """Create project directory structure. Args: project_path: Project directory path project_type: Project type information """ def create_directory_structure(base_path: Path, structure: Dict[str, Any]): for name, content in structure.items(): path = base_path / name if isinstance(content, dict): path.mkdir(exist_ok=True) create_directory_structure(path, content) create_directory_structure(project_path, project_type.file_structure) async def _initialize_build_system( self, project_path: Path, project_type: ProjectType, project_config: Dict[str, Any] ): """Initialize project build system. Args: project_path: Project directory path project_type: Project type information project_config: Project configuration """ build_system = BuildSystem(project_config["build_system"]) # Generate build system configuration files if build_system == BuildSystem.MAVEN: await self.template_manager.generate_maven_pom( project_path, project_config ) elif build_system == BuildSystem.GRADLE: await self.template_manager.generate_gradle_build( project_path, project_config ) elif build_system == BuildSystem.DOTNET: await self.template_manager.generate_dotnet_project( project_path, project_config ) elif build_system in [BuildSystem.NPM, BuildSystem.YARN]: await self.template_manager.generate_package_json( project_path, project_config ) elif build_system == BuildSystem.POETRY: await self.template_manager.generate_pyproject_toml( project_path, project_config ) async def _setup_docker_environment( self, project_path: Path, project_type: ProjectType, project_config: Dict[str, Any] ): """Set up Docker environment for the project. Args: project_path: Project directory path project_type: Project type information project_config: Project configuration """ # Generate Dockerfile from template dockerfile_template = project_type.docker_templates[0] # Use first template dockerfile_content = await self.docker_manager.generate_dockerfile( dockerfile_template, project_config ) dockerfile_path = project_path / "Dockerfile" with open(dockerfile_path, "w") as f: f.write(dockerfile_content) # Generate docker-compose.yml if needed if project_config.get("use_docker_compose", False): services = { "app": { "build": ".", "volumes": [ "./:/workspace" ], "environment": project_type.environment_variables } } compose_content = await self.docker_manager.create_compose_config( project_config["name"], services, project_path / "docker-compose.yml" ) async def _create_project_instance( self, path: str, config: Dict[str, Any], project_type: ProjectType ) -> Any: """Create project instance based on type. Args: path: Project directory path config: Project configuration project_type: Project type information Returns: Project instance """ # Import appropriate project class based on type if project_type.name == "java": from .java_project import JavaProject return JavaProject(path, config, project_type) elif project_type.name == "dotnet": from .dotnet_project import DotNetProject return DotNetProject(path, config, project_type) elif project_type.name == "node": from .node_project import NodeProject return NodeProject(path, config, project_type) elif project_type.name == "python": from .python_project import PythonProject return PythonProject(path, config, project_type) elif project_type.name == "golang": from .golang_project import GolangProject return GolangProject(path, config, project_type) else: from .base_project import Project return Project(path, config, project_type) ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/workflow/manager.py: -------------------------------------------------------------------------------- ```python """Development workflow management for MCP Development Server.""" from typing import Dict, List, Optional, Any from enum import Enum from datetime import datetime import asyncio from ..utils.errors import WorkflowError from ..utils.logging import setup_logging logger = setup_logging(__name__) class WorkflowStatus(str, Enum): """Workflow execution status.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class WorkflowStep: """Individual step in a workflow.""" def __init__( self, name: str, command: str, environment: str, depends_on: Optional[List[str]] = None, timeout: Optional[int] = None, retry_count: int = 0 ): self.name = name self.command = command self.environment = environment self.depends_on = depends_on or [] self.timeout = timeout self.retry_count = retry_count self.status = WorkflowStatus.PENDING self.result: Optional[Dict[str, Any]] = None self.attempts = 0 class WorkflowManager: """Manages development workflows.""" def __init__(self, env_manager): self.env_manager = env_manager self.workflows: Dict[str, Dict[str, Any]] = {} async def create_workflow( self, steps: List[WorkflowStep], config: Optional[Dict[str, Any]] = None ) -> str: """Create a new workflow.""" try: workflow_id = f"workflow_{len(self.workflows)}" # Initialize workflow self.workflows[workflow_id] = { "steps": steps, "config": config or {}, "status": WorkflowStatus.PENDING, "start_time": None, "end_time": None } return workflow_id except Exception as e: raise WorkflowError(f"Failed to create workflow: {str(e)}") async def start_workflow(self, workflow_id: str) -> None: """Start workflow execution.""" try: if workflow := self.workflows.get(workflow_id): workflow["status"] = WorkflowStatus.RUNNING workflow["start_time"] = datetime.now() # Execute workflow steps asyncio.create_task(self._execute_workflow(workflow_id)) else: raise WorkflowError(f"Workflow not found: {workflow_id}") except Exception as e: raise WorkflowError(f"Failed to start workflow: {str(e)}") async def _execute_workflow(self, workflow_id: str) -> None: """Execute workflow steps in order.""" workflow = self.workflows[workflow_id] try: # Build execution graph graph = self._build_execution_graph(workflow["steps"]) # Execute steps in dependency order for step_group in graph: results = await asyncio.gather( *[self._execute_step(workflow_id, step) for step in step_group], return_exceptions=True ) # Check for failures if any(isinstance(r, Exception) for r in results): workflow["status"] = WorkflowStatus.FAILED return workflow["status"] = WorkflowStatus.COMPLETED except Exception as e: logger.error(f"Workflow execution error: {str(e)}") workflow["status"] = WorkflowStatus.FAILED workflow["error"] = str(e) finally: workflow["end_time"] = datetime.now() async def _execute_step( self, workflow_id: str, step: WorkflowStep ) -> None: """Execute a single workflow step.""" try: step.status = WorkflowStatus.RUNNING step.attempts += 1 # Execute step command result = await asyncio.wait_for( self.env_manager.execute_in_environment( step.environment, step.command ), timeout=step.timeout ) # Handle step result success = result["exit_code"] == 0 step.result = { "output": result["output"], "error": result.get("error"), "exit_code": result["exit_code"] } if success: step.status = WorkflowStatus.COMPLETED else: # Handle retry logic if step.attempts < step.retry_count + 1: logger.info(f"Retrying step {step.name} (attempt {step.attempts})") return await self._execute_step(workflow_id, step) step.status = WorkflowStatus.FAILED except asyncio.TimeoutError: step.status = WorkflowStatus.FAILED step.result = { "error": "Step execution timed out" } except Exception as e: step.status = WorkflowStatus.FAILED step.result = { "error": str(e) } def _build_execution_graph( self, steps: List[WorkflowStep] ) -> List[List[WorkflowStep]]: """Build ordered list of step groups based on dependencies.""" # Initialize variables graph: List[List[WorkflowStep]] = [] completed = set() remaining = set(step.name for step in steps) steps_by_name = {step.name: step for step in steps} while remaining: # Find steps with satisfied dependencies group = set() for step_name in remaining: step = steps_by_name[step_name] if all(dep in completed for dep in step.depends_on): group.add(step_name) if not group: # Circular dependency detected raise WorkflowError("Circular dependency detected in workflow steps") # Add group to graph graph.append([steps_by_name[name] for name in group]) completed.update(group) remaining.difference_update(group) return graph async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]: """Get status and results of a workflow.""" if workflow := self.workflows.get(workflow_id): return { "id": workflow_id, "status": workflow["status"], "steps": [ { "name": step.name, "status": step.status, "result": step.result, "attempts": step.attempts } for step in workflow["steps"] ], "start_time": workflow["start_time"], "end_time": workflow["end_time"], "error": workflow.get("error") } raise WorkflowError(f"Workflow not found: {workflow_id}") def get_common_workflows(self) -> Dict[str, List[WorkflowStep]]: """Get predefined common workflow templates.""" return { "build": [ WorkflowStep( name="install", command="npm install", environment="default" ), WorkflowStep( name="lint", command="npm run lint", environment="default", depends_on=["install"] ), WorkflowStep( name="test", command="npm test", environment="default", depends_on=["install"] ), WorkflowStep( name="build", command="npm run build", environment="default", depends_on=["lint", "test"] ) ], "test": [ WorkflowStep( name="install_deps", command="npm install", environment="default" ), WorkflowStep( name="unit_tests", command="npm run test:unit", environment="default", depends_on=["install_deps"] ), WorkflowStep( name="integration_tests", command="npm run test:integration", environment="default", depends_on=["install_deps"] ), WorkflowStep( name="coverage", command="npm run coverage", environment="default", depends_on=["unit_tests", "integration_tests"] ) ], "release": [ WorkflowStep( name="bump_version", command="npm version patch", environment="default" ), WorkflowStep( name="build", command="npm run build", environment="default", depends_on=["bump_version"] ), WorkflowStep( name="publish", command="npm publish", environment="default", depends_on=["build"] ) ] } ``` -------------------------------------------------------------------------------- /src/mcp_dev_server/docker/streams.py: -------------------------------------------------------------------------------- ```python """Container output streaming and file synchronization.""" import os import time import asyncio import hashlib import collections from enum import Enum from datetime import datetime from typing import Dict, List, Optional, AsyncGenerator, Any from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from ..utils.logging import setup_logging from ..utils.errors import StreamError, SyncError logger = setup_logging(__name__) class OutputFormat(str, Enum): """Output stream formats.""" STDOUT = "stdout" STDERR = "stderr" COMBINED = "combined" FORMATTED = "formatted" class StreamConfig: """Stream configuration.""" def __init__( self, format: OutputFormat = OutputFormat.COMBINED, buffer_size: int = 1024, filters: Optional[List[str]] = None, timestamp: bool = False ): self.format = format self.buffer_size = buffer_size self.filters = filters or [] self.timestamp = timestamp class SyncConfig: """Synchronization configuration.""" def __init__( self, ignore_patterns: Optional[List[str]] = None, sync_interval: float = 1.0, atomic: bool = True ): self.ignore_patterns = ignore_patterns or [] self.sync_interval = sync_interval self.atomic = atomic class StreamInfo: """Information about an active stream.""" def __init__(self, task: asyncio.Task, config: StreamConfig): self.task = task self.config = config self.start_time = datetime.now() class EnhancedOutputStreamManager: """Enhanced streaming output manager.""" def __init__(self, docker_manager): self.docker_manager = docker_manager self.active_streams: Dict[str, StreamInfo] = {} self._buffer = collections.deque(maxlen=1000) # Keep last 1000 messages async def start_stream( self, container_name: str, command: str, config: StreamConfig, callback: Optional[callable] = None ) -> AsyncGenerator[str, None]: """Start enhanced output stream.""" try: container = self.docker_manager.containers.get(container_name) if not container: raise StreamError(f"Container not found: {container_name}") # Create execution with specified format exec_result = container.exec_run( command, stream=True, demux=True, socket=True # Use socket for better streaming ) async def stream_handler(): buffer = [] try: async for data in exec_result.output: # Apply format and filtering processed_data = self._process_stream_data(data, config) if processed_data: buffer.extend(processed_data) if len(buffer) >= config.buffer_size: output = ''.join(buffer) buffer.clear() self._buffer.append(output) if callback: await callback(output) yield output except Exception as e: logger.error(f"Stream processing error: {str(e)}") raise StreamError(f"Stream processing error: {str(e)}") finally: if buffer: output = ''.join(buffer) self._buffer.append(output) if callback: await callback(output) yield output if container_name in self.active_streams: del self.active_streams[container_name] # Create and store stream task stream_task = asyncio.create_task(stream_handler()) self.active_streams[container_name] = StreamInfo(stream_task, config) async for output in stream_task: yield output except Exception as e: logger.error(f"Failed to start stream: {str(e)}") raise StreamError(f"Failed to start stream: {str(e)}") def _process_stream_data( self, data: bytes, config: StreamConfig ) -> Optional[str]: """Process stream data according to config.""" if not data: return None # Split streams if demuxed stdout, stderr = data if isinstance(data, tuple) else (data, None) # Apply format if config.format == OutputFormat.STDOUT and stdout: output = stdout.decode() elif config.format == OutputFormat.STDERR and stderr: output = stderr.decode() elif config.format == OutputFormat.COMBINED: output = '' if stdout: output += stdout.decode() if stderr: output += stderr.decode() elif config.format == OutputFormat.FORMATTED: output = self._format_output(stdout, stderr) else: return None # Apply filters for filter_pattern in config.filters: if filter_pattern in output: return None # Add timestamp if requested if config.timestamp: output = f"[{datetime.now().isoformat()}] {output}" return output @staticmethod def _format_output(stdout: Optional[bytes], stderr: Optional[bytes]) -> str: """Format output with colors and prefixes.""" output = [] if stdout: output.append(f"\033[32m[OUT]\033[0m {stdout.decode()}") if stderr: output.append(f"\033[31m[ERR]\033[0m {stderr.decode()}") return '\n'.join(output) async def stop_stream(self, container_name: str) -> None: """Stop streaming from a container.""" if stream_info := self.active_streams.get(container_name): stream_info.task.cancel() try: await stream_info.task except asyncio.CancelledError: pass del self.active_streams[container_name] class BiDirectionalSync: """Enhanced bi-directional file synchronization.""" def __init__(self, docker_manager): self.docker_manager = docker_manager self.sync_handlers: Dict[str, EnhancedSyncHandler] = {} self.observer = Observer() self.observer.start() async def start_sync( self, container_name: str, host_path: str, container_path: str, config: SyncConfig ) -> None: """Start bi-directional file sync.""" try: # Validate paths if not os.path.exists(host_path): raise SyncError(f"Host path does not exist: {host_path}") container = self.docker_manager.containers.get(container_name) if not container: raise SyncError(f"Container not found: {container_name}") # Create sync handler handler = EnhancedSyncHandler( container=container, container_path=container_path, host_path=host_path, config=config ) # Start watching both directions self.observer.schedule( handler, host_path, recursive=True ) # Start container file watcher await handler.start_container_watcher() self.sync_handlers[container_name] = handler logger.info(f"Started bi-directional sync for container: {container_name}") except Exception as e: raise SyncError(f"Failed to start sync: {str(e)}") async def stop_sync(self, container_name: str) -> None: """Stop synchronization for a container.""" if handler := self.sync_handlers.get(container_name): self.observer.unschedule_all() await handler.stop_container_watcher() del self.sync_handlers[container_name] logger.info(f"Stopped sync for container: {container_name}") async def cleanup(self) -> None: """Clean up all synchronization handlers.""" for container_name in list(self.sync_handlers.keys()): await self.stop_sync(container_name) self.observer.stop() self.observer.join() class EnhancedSyncHandler(FileSystemEventHandler): """Enhanced sync handler with bi-directional support.""" def __init__( self, container, container_path: str, host_path: str, config: SyncConfig ): super().__init__() self.container = container self.container_path = container_path self.host_path = host_path self.config = config self.sync_lock = asyncio.Lock() self.pending_syncs: Dict[str, float] = {} self._container_watcher: Optional[asyncio.Task] = None async def start_container_watcher(self) -> None: """Start watching container files.""" cmd = f""" inotifywait -m -r -e modify,create,delete,move {self.container_path} """ exec_result = self.container.exec_run( cmd, stream=True, detach=True ) self._container_watcher = asyncio.create_task( self._handle_container_events(exec_result.output) ) async def stop_container_watcher(self) -> None: """Stop container file watcher.""" if self._container_watcher: self._container_watcher.cancel() try: await self._container_watcher except asyncio.CancelledError: pass self._container_watcher = None async def _handle_container_events(self, output_stream: AsyncGenerator) -> None: """Handle container file events.""" try: async for event in output_stream: await self._handle_container_change(event.decode()) except Exception as e: logger.error(f"Container watcher error: {str(e)}") async def _handle_container_change(self, event: str) -> None: """Handle container file change.""" try: # Parse inotify event parts = event.strip().split() if len(parts) >= 3: path = parts[0] change_type = parts[1] filename = parts[2] container_path = os.path.join(path, filename) host_path = self._container_to_host_path(container_path) # Apply filters if self._should_ignore(host_path): return async with self.sync_lock: # Check if change is from host sync if host_path in self.pending_syncs: if time.time() - self.pending_syncs[host_path] < self.config.sync_interval: return # Sync from container to host await self._sync_to_host(container_path, host_path) except Exception as e: logger.error(f"Error handling container change: {str(e)}") def _container_to_host_path(self, container_path: str) -> str: """Convert container path to host path.""" rel_path = os.path.relpath(container_path, self.container_path) return os.path.join(self.host_path, rel_path) def _should_ignore(self, path: str) -> bool: """Check if path should be ignored.""" return any(pattern in path for pattern in self.config.ignore_patterns) async def _sync_to_host( self, container_path: str, host_path: str ) -> None: """Sync file from container to host.""" try: # Get file from container stream, stat = self.container.get_archive(container_path) # Create parent directories os.makedirs(os.path.dirname(host_path), exist_ok=True) if self.config.atomic: # Save file atomically using temporary file tmp_path = f"{host_path}.tmp" with open(tmp_path, 'wb') as f: for chunk in stream: f.write(chunk) os.rename(tmp_path, host_path) else: # Direct write with open(host_path, 'wb') as f: for chunk in stream: f.write(chunk) # Update sync tracking self.pending_syncs[host_path] = time.time() except Exception as e: logger.error(f"Error syncing to host: {str(e)}") raise SyncError(f"Failed to sync file {container_path}: {str(e)}") ```