#
tokens: 34112/50000 55/55 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── package.json
├── pyproject.toml
├── README.md
├── setup.py
├── src
│   ├── mcp_dev_server
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── core
│   │   │   ├── __init__.py
│   │   │   └── server.py
│   │   ├── docker
│   │   │   ├── manager.py
│   │   │   ├── streams.py
│   │   │   ├── templates
│   │   │   │   ├── dev.dockerfile
│   │   │   │   ├── node.dockerfile
│   │   │   │   └── python.dockerfile
│   │   │   ├── templates.py
│   │   │   ├── volumes.py
│   │   │   └── xxx.py
│   │   ├── environments
│   │   │   ├── manager.py
│   │   │   ├── tools.py
│   │   │   └── workflow.py
│   │   ├── handlers
│   │   │   ├── __init__.py
│   │   │   └── input_request_handler.py
│   │   ├── managers
│   │   │   ├── __init__.py
│   │   │   ├── base_manager.py
│   │   │   ├── build_manager.py
│   │   │   ├── dependency_manager.py
│   │   │   ├── project_manager.py
│   │   │   ├── template_manager.py
│   │   │   ├── test_manager.py
│   │   │   └── workflow_manager.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── config.py
│   │   │   ├── errors.py
│   │   │   └── input_response.py
│   │   ├── package
│   │   │   └── manager.py
│   │   ├── project_manager
│   │   │   ├── base_project.py
│   │   │   ├── context.py
│   │   │   ├── git.py
│   │   │   ├── manager.py
│   │   │   ├── project_types.py
│   │   │   ├── project.py
│   │   │   └── templates.py
│   │   ├── prompts
│   │   │   ├── handler.py
│   │   │   ├── input_protocol.py
│   │   │   ├── project_templates.py
│   │   │   └── templates.py
│   │   ├── server.py
│   │   ├── test
│   │   │   └── manager.py
│   │   ├── utils
│   │   │   ├── __init__.py
│   │   │   ├── config.py
│   │   │   ├── errors.py
│   │   │   └── logging.py
│   │   └── workflow
│   │       └── manager.py
│   └── resources
│       └── templates
│           └── basic
│               └── files
├── tests
│   └── test_integration.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[cod]
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
.env
.venv
env/
venv/
ENV/

# IDE
.idea/
.vscode/
*.swp
*.swo

# Project specific
*.log
.docker/
.pytest_cache/
.coverage
htmlcov/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MCP Development Server

A Model Context Protocol (MCP) server that enables Claude to manage software development projects, providing complete project context awareness and handling code execution through Docker environments.

## Features

### Core Infrastructure
- Project context management
- File system operations
- Template-based project creation
- Git integration

### Requirements
- Python 3.12 or higher
- Docker
- Git

## Installation

```bash
# Using pip
pip install mcp-dev-server

# Development installation
git clone https://github.com/your-org/mcp-dev-server.git
cd mcp-dev-server
pip install -e .
```

## Configuration

### Claude Desktop Configuration

Add to your Claude Desktop configuration file:

On MacOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
On Windows: `%APPDATA%/Claude/claude_desktop_config.json`

```json
{
  "mcpServers": {
    "dev": {
      "command": "mcp-dev-server",
      "args": []
    }
  }
}
```

## Usage

The server provides several MCP capabilities:

### Resources
- Project structure and files
- Build status and artifacts
- Test results
- Docker container status

### Tools
- Project initialization
- Build operations
- Test execution
- Docker commands

### Prompts
- Project analysis
- Development suggestions
- Error diagnosis

## Development

### Setting up development environment

```bash
# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install dependencies
pip install -e ".[dev]"
```

### Running tests

```bash
pytest tests/
```

## Contributing

Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct and the process for submitting pull requests.

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/xxx.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/core/__init__.py:
--------------------------------------------------------------------------------

```python
from .server import Server

__all__ = ['Server']
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/input_protocol.py:
--------------------------------------------------------------------------------

```python
"""Input request protocol for MCP server."""
[Previous content...]
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/handlers/__init__.py:
--------------------------------------------------------------------------------

```python
from .input_request_handler import InputRequestHandler

__all__ = ['InputRequestHandler']
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/models/errors.py:
--------------------------------------------------------------------------------

```python
class MCPDevServerError(Exception):
    """Base exception class for MCP Development Server errors."""
    pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/__main__.py:
--------------------------------------------------------------------------------

```python
"""Main entry point when run with python -m mcp_dev_server"""
from . import main

if __name__ == '__main__':
    main()
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/test_manager.py:
--------------------------------------------------------------------------------

```python
class TestManager:
    """Manager class for test-related operations."""
    
    def __init__(self):
        """Initialize the test manager."""
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/build_manager.py:
--------------------------------------------------------------------------------

```python
class BuildManager:
    """Manager class for build-related operations."""
    
    def __init__(self):
        """Initialize the build manager."""
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/models/__init__.py:
--------------------------------------------------------------------------------

```python
from .config import Config
from .input_response import InputResponse
from .errors import MCPDevServerError

__all__ = ['Config', 'InputResponse', 'MCPDevServerError']
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/template_manager.py:
--------------------------------------------------------------------------------

```python
class TemplateManager:
    """Manager class for template-related operations."""
    
    def __init__(self):
        """Initialize the template manager."""
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/workflow_manager.py:
--------------------------------------------------------------------------------

```python
class WorkflowManager:
    """Manager class for workflow-related operations."""
    
    def __init__(self):
        """Initialize the workflow manager."""
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/dependency_manager.py:
--------------------------------------------------------------------------------

```python
class DependencyManager:
    """Manager class for dependency-related operations."""
    
    def __init__(self):
        """Initialize the dependency manager."""
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/project_manager.py:
--------------------------------------------------------------------------------

```python
from ..models import Config

class ProjectManager:
    """Manager class for project-related operations."""
    
    def __init__(self, config: Config):
        """Initialize the project manager.
        
        Args:
            config: Server configuration
        """
        self.config = config
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/__init__.py:
--------------------------------------------------------------------------------

```python
from .project_manager import ProjectManager
from .template_manager import TemplateManager
from .build_manager import BuildManager
from .dependency_manager import DependencyManager
from .test_manager import TestManager
from .workflow_manager import WorkflowManager

__all__ = [
    'ProjectManager',
    'TemplateManager',
    'BuildManager',
    'DependencyManager',
    'TestManager',
    'WorkflowManager'
]
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/managers/base_manager.py:
--------------------------------------------------------------------------------

```python
"""Base manager class with common functionality."""
import uuid
from typing import Dict, Any

class BaseManager:
    """Base class for all managers."""
    
    def _generate_id(self) -> str:
        """Generate a unique identifier.
        
        Returns:
            str: Unique identifier
        """
        return str(uuid.uuid4())
        
    async def cleanup(self):
        """Clean up resources. Override in subclasses."""
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/__init__.py:
--------------------------------------------------------------------------------

```python
"""MCP Development Server Package."""
from . import server
import asyncio
from typing import Optional
from .utils.logging import setup_logging

logger = setup_logging(__name__)

def main():
    """Main entry point for the package."""
    try:
        server_instance = server.MCPDevServer()
        asyncio.run(server_instance.run())
    except KeyboardInterrupt:
        logger.info("Server shutdown requested")
    except Exception as e:
        logger.error(f"Server error: {str(e)}")
        raise

# Expose key components at package level
__all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------

```python
from setuptools import setup, find_packages

setup(
    name="mcp-dev-server",
    version="0.1.0",
    packages=find_packages(where="src"),
    package_dir={"": "src"},
    install_requires=[
        "mcp",            # Base MCP package
        "aiohttp>=3.8.0",
        "websockets>=10.0",
        "uvicorn>=0.15.0",
        "fastapi>=0.68.0",
        "typing_extensions>=4.5.0",
    ],
    entry_points={
        "console_scripts": [
            "mcp-dev-server=mcp_dev_server:main",
        ],
    },
    python_requires=">=3.8",
    author="Your Name",
    description="MCP Development Server"
)
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/models/input_response.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict

class InputResponse:
    """Class representing a user's input response."""
    
    def __init__(self, request_id: str, values: Dict[str, Any]):
        """Initialize an input response.
        
        Args:
            request_id: ID of the input request
            values: Dictionary of input values
        """
        self.request_id = request_id
        self.values = values
        
    def validate(self) -> bool:
        """Validate the input response.
        
        Returns:
            bool: True if valid, False otherwise
        """
        return True  # TODO: Implement validation
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/errors.py:
--------------------------------------------------------------------------------

```python
"""Error definitions for MCP Development Server."""

class MCPDevServerError(Exception):
    """Base error class for MCP Development Server."""
    pass

class ProjectError(MCPDevServerError):
    """Project-related errors."""
    pass

class BuildError(MCPDevServerError):
    """Build-related errors."""
    pass

class TestError(MCPDevServerError):
    """Test-related errors."""
    pass

class EnvironmentError(MCPDevServerError):
    """Environment-related errors."""
    pass

class ConfigurationError(MCPDevServerError):
    """Configuration-related errors."""
    pass

class WorkflowError(MCPDevServerError):
    """Workflow-related errors."""
    pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates/node.dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Node.js development environment
FROM node:{{ node_version }}

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /workspace

{% if package_file %}
# Install Node.js dependencies
COPY {{ package_file }} .
{% if package_lock %}
COPY {{ package_lock }} .
RUN npm ci
{% else %}
RUN npm install
{% endif %}
{% endif %}

{% if global_packages %}
# Install global packages
RUN npm install -g {% for package in global_packages %}{{ package }} {% endfor %}
{% endif %}

# Set Node.js environment variables
ENV NODE_ENV=development

{% if command %}
# Default command
CMD {{ command }}
{% endif %}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/models/config.py:
--------------------------------------------------------------------------------

```python
class Config:
    """Configuration class for MCP Development Server."""
    
    def __init__(self):
        """Initialize configuration with default values."""
        self.host = "localhost"
        self.port = 8000
        self.debug = False
        
    def load_from_file(self, file_path: str):
        """Load configuration from a file.
        
        Args:
            file_path: Path to configuration file
        """
        pass  # TODO: Implement configuration loading
        
    def save_to_file(self, file_path: str):
        """Save current configuration to a file.
        
        Args:
            file_path: Path to save configuration
        """
        pass  # TODO: Implement configuration saving
```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "mcp-dev-server",
  "version": "1.0.0",
  "description": "Model Context Protocol Development Server",
  "main": "dist/app.js",
  "scripts": {
    "start": "node dist/app.js",
    "dev": "nodemon src/app.ts",
    "build": "tsc",
    "test": "jest",
    "lint": "eslint . --ext .ts"
  },
  "dependencies": {
    "express": "^4.18.2",
    "typescript": "^5.0.0",
    "mongoose": "^7.0.0",
    "dotenv": "^16.0.0",
    "winston": "^3.8.0",
    "cors": "^2.8.5",
    "helmet": "^6.0.0",
    "joi": "^17.0.0"
  },
  "devDependencies": {
    "@types/express": "^4.17.17",
    "@types/node": "^18.0.0",
    "@types/jest": "^29.0.0",
    "@typescript-eslint/eslint-plugin": "^5.0.0",
    "@typescript-eslint/parser": "^5.0.0",
    "eslint": "^8.0.0",
    "jest": "^29.0.0",
    "nodemon": "^2.0.0",
    "ts-jest": "^29.0.0",
    "ts-node": "^10.0.0"
  }
}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates/python.dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Python development environment
FROM python:{{ python_version }}-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    curl \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /workspace

{% if install_poetry %}
# Install Poetry
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="/root/.local/bin:$PATH"
{% endif %}

{% if requirements_file %}
# Install Python dependencies
COPY {{ requirements_file }} .
RUN pip install -r {{ requirements_file }}
{% endif %}

{% if additional_packages %}
# Install additional packages
RUN pip install {% for package in additional_packages %}{{ package }} {% endfor %}
{% endif %}

# Set Python environment variables
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1

{% if command %}
# Default command
CMD {{ command }}
{% endif %}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/logging.py:
--------------------------------------------------------------------------------

```python
"""Logging configuration for MCP Development Server."""
import logging
import sys
from typing import Optional

def setup_logging(name: Optional[str] = None, level: int = logging.INFO) -> logging.Logger:
    """Setup logging configuration.
    
    Args:
        name: Logger name
        level: Logging level
        
    Returns:
        logging.Logger: Configured logger instance
    """
    # Create logger
    logger = logging.getLogger(name or __name__)
    logger.setLevel(level)
    
    # Create stderr handler (MCP protocol requires clean stdout)
    handler = logging.StreamHandler(sys.stderr)
    handler.setLevel(level)
    
    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    
    # Add handler to logger
    logger.addHandler(handler)
    
    return logger
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/handlers/input_request_handler.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any, Optional
from ..models import InputResponse

class InputRequestHandler:
    """Handler for input requests."""
    
    def __init__(self):
        """Initialize the input request handler."""
        pass
        
    async def request_input(self, request_type: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Request input from the user.
        
        Args:
            request_type: Type of input request
            context: Additional context for request
            
        Returns:
            Dict[str, Any]: User's input values
        """
        return {}  # TODO: Implement input request handling
        
    def handle_response(self, response: InputResponse):
        """Handle input response from user.
        
        Args:
            response: User's response
        """
        pass  # TODO: Implement response handling
```

--------------------------------------------------------------------------------
/tests/test_integration.py:
--------------------------------------------------------------------------------

```python
"""Test MCP server integration with Claude."""
import asyncio
import pytest
from mcp_dev_server.server import MCPDevServer
from mcp_dev_server.utils.config import Config

@pytest.mark.asyncio
async def test_server_initialization():
    """Test server initialization."""
    config = Config()
    server = MCPDevServer()
    
    # Test project creation
    project = await server.project_manager.create_project(
        name="test-project",
        project_type="python",
        project_config={
            "python_version": "3.12",
            "project_type": "fastapi",
            "dependency_management": "poetry"
        }
    )
    
    assert project is not None
    assert project.config["name"] == "test-project"
    
    # Test tool execution
    result = await server.handle_call_tool("build", {
        "environment": "default",
        "command": "build"
    })
    
    assert result[0].type == "text"
    
    # Cleanup
    await server.cleanup()
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates.py:
--------------------------------------------------------------------------------

```python
"""Dockerfile templates for different environments."""
from typing import Dict, Optional
from jinja2 import Template

class DockerTemplates:
    """Manages Dockerfile templates for different environments."""
    
    @staticmethod
    def get_template(environment: str, config: Optional[Dict[str, Any]] = None) -> str:
        """Get Dockerfile template for specific environment."""
        config = config or {}
        
        if environment == "python":
            return Template("""
FROM python:{{ python_version|default('3.12-slim') }}

WORKDIR /app

{% if requirements_file %}
COPY {{ requirements_file }} .
RUN pip install -r {{ requirements_file }}
{% endif %}

{% if install_dev_deps %}
RUN pip install pytest mypy black
{% endif %}

{% for cmd in additional_commands|default([]) %}
RUN {{ cmd }}
{% endfor %}

COPY . .

CMD ["python", "{{ entry_point|default('main.py') }}"]
""").render(config)
            
        elif environment == "node":
            return Template("""
FROM node:{{ node_version|default('20-slim') }}

WORKDIR /app

COPY package*.json ./

RUN npm install {% if install_dev_deps %}--include=dev{% endif %}

{% for cmd in additional_commands|default([]) %}
RUN {{ cmd }}
{% endfor %}

COPY . .

CMD ["npm", "{{ npm_command|default('start') }}"]
""").render(config)
            
        else:
            raise ValueError(f"Unknown environment: {environment}")
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/templates/dev.dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Multi-language development environment
FROM ubuntu:{{ ubuntu_version }}

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    curl \
    build-essential \
    software-properties-common \
    && rm -rf /var/lib/apt/lists/*

{% if install_python %}
# Install Python
RUN add-apt-repository ppa:deadsnakes/ppa && \
    apt-get update && \
    apt-get install -y python{{ python_version }} python{{ python_version }}-venv python{{ python_version }}-dev && \
    rm -rf /var/lib/apt/lists/*
{% endif %}

{% if install_node %}
# Install Node.js
RUN curl -fsSL https://deb.nodesource.com/setup_{{ node_version }}.x | bash - && \
    apt-get install -y nodejs && \
    rm -rf /var/lib/apt/lists/*
{% endif %}

{% if install_docker %}
# Install Docker
RUN curl -fsSL https://get.docker.com | sh && \
    rm -rf /var/lib/apt/lists/*
{% endif %}

# Set working directory
WORKDIR /workspace

{% if requirements_file %}
# Install Python dependencies
COPY {{ requirements_file }} .
RUN pip{{ python_version }} install -r {{ requirements_file }}
{% endif %}

{% if package_file %}
# Install Node.js dependencies
COPY {{ package_file }} .
{% if package_lock %}
COPY {{ package_lock }} .
RUN npm ci
{% else %}
RUN npm install
{% endif %}
{% endif %}

{% if additional_tools %}
# Install additional tools
RUN apt-get update && apt-get install -y \
    {% for tool in additional_tools %}{{ tool }} {% endfor %} \
    && rm -rf /var/lib/apt/lists/*
{% endif %}

# Set environment variables
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    NODE_ENV=development

{% if command %}
# Default command
CMD {{ command }}
{% endif %}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/handler.py:
--------------------------------------------------------------------------------

```python
[Previous handler.py content...]

    async def process_field_dependencies(self, request: InputRequest, field_updates: Dict[str, Any]):
        """Process field dependencies based on user input.
        
        Some fields might need to be updated based on values of other fields.
        For example, if user selects Python as language, we need to show Python version field.
        
        Args:
            request: Current input request
            field_updates: Updated field values
        """
        try:
            if request.request_id == "environment_setup":
                language = field_updates.get("language")
                if language:
                    # Update required fields based on language selection
                    for field in request.fields:
                        if field.name == "python_version":
                            field.required = language in ["python", "both"]
                        elif field.name == "node_version":
                            field.required = language in ["node", "both"]
                            
            elif request.request_id == "test_configuration":
                test_framework = field_updates.get("test_framework")
                if test_framework:
                    # Update coverage options based on test framework
                    for field in request.fields:
                        if field.name == "include_coverage":
                            field.options = self._get_coverage_options(test_framework)
                            
    def _get_coverage_options(self, framework: str) -> List[Dict[str, str]]:
        """Get coverage tool options based on test framework."""
        coverage_tools = {
            "pytest": [
                {"value": "pytest-cov", "label": "pytest-cov"},
                {"value": "coverage", "label": "coverage.py"}
            ],
            "unittest": [
                {"value": "coverage", "label": "coverage.py"}
            ],
            "jest": [
                {"value": "jest-coverage", "label": "Jest Coverage"}
            ],
            "mocha": [
                {"value": "nyc", "label": "Istanbul/nyc"}
            ]
        }
        return coverage_tools.get(framework, [])
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/utils/config.py:
--------------------------------------------------------------------------------

```python
"""Configuration management for MCP Development Server."""
import os
import json
from typing import Dict, Any, Optional
from pathlib import Path

class Config:
    """Configuration manager."""
    
    def __init__(self):
        """Initialize configuration."""
        self.config_dir = self._get_config_dir()
        self.config_file = self.config_dir / "config.json"
        self.config: Dict[str, Any] = self._load_config()
        
    def _get_config_dir(self) -> Path:
        """Get configuration directory path."""
        if os.name == "nt":  # Windows
            config_dir = Path(os.getenv("APPDATA")) / "Claude"
        else:  # macOS/Linux
            config_dir = Path.home() / ".config" / "claude"
            
        config_dir.mkdir(parents=True, exist_ok=True)
        return config_dir
        
    def _load_config(self) -> Dict[str, Any]:
        """Load configuration from file."""
        if self.config_file.exists():
            try:
                with open(self.config_file, "r") as f:
                    return json.load(f)
            except Exception as e:
                print(f"Error loading config: {e}")
                return self._get_default_config()
        else:
            config = self._get_default_config()
            self._save_config(config)
            return config
            
    def _save_config(self, config: Dict[str, Any]):
        """Save configuration to file."""
        try:
            with open(self.config_file, "w") as f:
                json.dump(config, f, indent=2)
        except Exception as e:
            print(f"Error saving config: {e}")
            
    def _get_default_config(self) -> Dict[str, Any]:
        """Get default configuration."""
        return {
            "projectsDir": str(Path.home() / "Projects"),
            "templatesDir": str(self.config_dir / "templates"),
            "environments": {
                "default": {
                    "type": "docker",
                    "image": "python:3.12-slim"
                }
            }
        }
        
    def get(self, key: str, default: Any = None) -> Any:
        """Get configuration value."""
        return self.config.get(key, default)
        
    def set(self, key: str, value: Any):
        """Set configuration value."""
        self.config[key] = value
        self._save_config(self.config)
        
    def update(self, updates: Dict[str, Any]):
        """Update multiple configuration values."""
        self.config.update(updates)
        self._save_config(self.config)
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/volumes.py:
--------------------------------------------------------------------------------

```python
"""Docker volume management for MCP Development Server."""
from typing import Dict, List, Optional
import docker
from docker.errors import DockerException

from ..utils.logging import setup_logging
from ..utils.errors import DockerError

logger = setup_logging(__name__)

class VolumeManager:
    """Manages Docker volumes for development environments."""
    
    def __init__(self):
        self.client = docker.from_env()
        
    async def create_volume(
        self,
        name: str,
        labels: Optional[Dict[str, str]] = None
    ) -> str:
        """Create a Docker volume."""
        try:
            volume = self.client.volumes.create(
                name=name,
                driver='local',
                labels=labels or {}
            )
            logger.info(f"Created volume: {name}")
            return volume.name
            
        except DockerException as e:
            raise DockerError(f"Failed to create volume: {str(e)}")
            
    async def remove_volume(self, name: str) -> None:
        """Remove a Docker volume."""
        try:
            volume = self.client.volumes.get(name)
            volume.remove()
            logger.info(f"Removed volume: {name}")
            
        except DockerException as e:
            raise DockerError(f"Failed to remove volume: {str(e)}")
            
    async def list_volumes(
        self,
        filters: Optional[Dict[str, str]] = None
    ) -> List[Dict[str, Any]]:
        """List Docker volumes."""
        try:
            volumes = self.client.volumes.list(filters=filters or {})
            return [
                {
                    "name": v.name,
                    "driver": v.attrs['Driver'],
                    "mountpoint": v.attrs['Mountpoint'],
                    "labels": v.attrs['Labels'] or {}
                }
                for v in volumes
            ]
            
        except DockerException as e:
            raise DockerError(f"Failed to list volumes: {str(e)}")
            
    async def get_volume_info(self, name: str) -> Dict[str, Any]:
        """Get detailed information about a volume."""
        try:
            volume = self.client.volumes.get(name)
            return {
                "name": volume.name,
                "driver": volume.attrs['Driver'],
                "mountpoint": volume.attrs['Mountpoint'],
                "labels": volume.attrs['Labels'] or {},
                "scope": volume.attrs['Scope'],
                "status": volume.attrs.get('Status', {})
            }
            
        except DockerException as e:
            raise DockerError(f"Failed to get volume info: {str(e)}")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/package/manager.py:
--------------------------------------------------------------------------------

```python
"""Package management integration for MCP Development Server."""

from typing import Dict, List, Optional, Any
from enum import Enum
from ..utils.errors import PackageError
from ..utils.logging import setup_logging

logger = setup_logging(__name__)

class PackageManager(str, Enum):
    """Supported package managers."""
    NPM = "npm"
    PIP = "pip"
    CARGO = "cargo"

class DependencyManager:
    """Manages project dependencies."""
    
    def __init__(self, env_manager):
        self.env_manager = env_manager
        
    async def install_dependencies(
        self,
        environment: str,
        package_manager: PackageManager,
        dependencies: List[str],
        dev: bool = False
    ) -> Dict[str, Any]:
        """Install project dependencies."""
        try:
            command = self._build_install_command(
                package_manager,
                dependencies,
                dev
            )
            
            result = await self.env_manager.execute_in_environment(
                environment,
                command
            )
            
            return {
                "success": result["exit_code"] == 0,
                "output": result["output"],
                "error": result.get("error")
            }
            
        except Exception as e:
            raise PackageError(f"Failed to install dependencies: {str(e)}")
            
    async def update_dependencies(
        self,
        environment: str,
        package_manager: PackageManager,
        dependencies: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Update project dependencies."""
        try:
            command = self._build_update_command(package_manager, dependencies)
            
            result = await self.env_manager.execute_in_environment(
                environment,
                command
            )
            
            return {
                "success": result["exit_code"] == 0,
                "output": result["output"],
                "error": result.get("error")
            }
            
        except Exception as e:
            raise PackageError(f"Failed to update dependencies: {str(e)}")
            
    def _build_install_command(
        self,
        package_manager: PackageManager,
        dependencies: List[str],
        dev: bool
    ) -> str:
        """Build dependency installation command."""
        if package_manager == PackageManager.NPM:
            dev_flag = "--save-dev" if dev else ""
            deps = " ".join(dependencies)
            return f"npm install {dev_flag} {deps}"
            
        elif package_manager == PackageManager.PIP:
            dev_flag = "-D" if dev else ""
            deps = " ".join(dependencies)
            return f"pip install {dev_flag} {deps}"
            
        elif package_manager == PackageManager.CARGO:
            dev_flag = "--dev" if dev else ""
            deps = " ".join(dependencies)
            return f"cargo add {dev_flag} {deps}"
            
        else:
            raise PackageError(f"Unsupported package manager: {package_manager}")
            
    def _build_update_command(
        self,
        package_manager: PackageManager,
        dependencies: Optional[List[str]] = None
    ) -> str:
        """Build dependency update command."""
        if package_manager == PackageManager.NPM:
            return "npm update" if not dependencies else f"npm update {' '.join(dependencies)}"
            
        elif package_manager == PackageManager.PIP:
            return "pip install -U -r requirements.txt" if not dependencies else f"pip install -U {' '.join(dependencies)}"
            
        elif package_manager == PackageManager.CARGO:
            return "cargo update" if not dependencies else f"cargo update {' '.join(dependencies)}"
            
        else:
            raise PackageError(f"Unsupported package manager: {package_manager}")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/test/manager.py:
--------------------------------------------------------------------------------

```python
"""Test system integration for MCP Development Server."""

import asyncio
from typing import Dict, List, Optional, Any
from enum import Enum
from datetime import datetime
from ..utils.errors import TestError
from ..utils.logging import setup_logging

logger = setup_logging(__name__)

class TestStatus(str, Enum):
    """Test execution status."""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    ERROR = "error"

class TestManager:
    """Manages test execution and reporting."""
    
    def __init__(self, env_manager):
        self.env_manager = env_manager
        self.test_runs: Dict[str, Dict[str, Any]] = {}
        
    async def run_tests(
        self,
        environment: str,
        config: Dict[str, Any]
    ) -> str:
        """Start a test run."""
        try:
            test_id = f"test_{len(self.test_runs)}"
            
            # Initialize test run
            self.test_runs[test_id] = {
                "environment": environment,
                "config": config,
                "status": TestStatus.PENDING,
                "results": [],
                "start_time": datetime.now(),
                "end_time": None
            }
            
            # Start test execution
            asyncio.create_task(self._execute_tests(test_id))
            
            return test_id
            
        except Exception as e:
            raise TestError(f"Failed to start tests: {str(e)}")
            
    async def _execute_tests(self, test_id: str) -> None:
        """Execute test suite."""
        try:
            test_run = self.test_runs[test_id]
            test_run["status"] = TestStatus.RUNNING
            
            # Run test command
            result = await self.env_manager.execute_in_environment(
                test_run["environment"],
                test_run["config"].get("command", "npm test"),
                workdir=test_run["config"].get("workdir")
            )
            
            # Parse and store results
            test_run["results"] = self._parse_test_output(
                result["output"],
                test_run["config"].get("format", "jest")
            )
            
            # Update test status
            test_run["end_time"] = datetime.now()
            test_run["status"] = (
                TestStatus.SUCCESS
                if result["exit_code"] == 0
                else TestStatus.FAILED
            )
            
        except Exception as e:
            logger.error(f"Test execution error: {str(e)}")
            test_run["status"] = TestStatus.ERROR
            test_run["error"] = str(e)
            
    async def get_test_status(self, test_id: str) -> Dict[str, Any]:
        """Get status and results of a test run."""
        if test_run := self.test_runs.get(test_id):
            return {
                "id": test_id,
                "status": test_run["status"],
                "results": test_run["results"],
                "start_time": test_run["start_time"],
                "end_time": test_run["end_time"],
                "error": test_run.get("error")
            }
        raise TestError(f"Test run not found: {test_id}")
        
    def _parse_test_output(
        self,
        output: str,
        format: str
    ) -> List[Dict[str, Any]]:
        """Parse test output into structured results."""
        if format == "jest":
            return self._parse_jest_output(output)
        elif format == "pytest":
            return self._parse_pytest_output(output)
        else:
            logger.warning(f"Unknown test output format: {format}")
            return [{"raw_output": output}]
            
    def _parse_jest_output(self, output: str) -> List[Dict[str, Any]]:
        """Parse Jest test output."""
        results = []
        # Implement Jest output parsing
        return results
        
    def _parse_pytest_output(self, output: str) -> List[Dict[str, Any]]:
        """Parse pytest output."""
        results = []
        # Implement pytest output parsing
        return results

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/server.py:
--------------------------------------------------------------------------------

```python
"""MCP Development Server implementation."""
from typing import Dict, Any, Optional, Sequence
import logging
import sys
import json

# Import MCP components
from mcp.server import Server as MCPServer
from mcp.server.stdio import stdio_server
import mcp.types as types

from .models import Config, InputResponse, MCPDevServerError
from .managers import (
    ProjectManager, 
    TemplateManager,
    BuildManager,
    DependencyManager,
    TestManager,
    WorkflowManager
)
from .handlers import InputRequestHandler

# Configure logging to stderr to keep stdout clean
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)  # Set to DEBUG for development

class MCPDevServer:
    """MCP Development Server implementation."""
    
    def __init__(self):
        """Initialize the MCP Development Server."""
        logger.info("Initializing MCP Development Server")
        
        try:
            # Initialize server
            self.server = MCPServer("mcp-dev-server")
            
            # Initialize configuration
            self.config = Config()
            
            # Initialize all managers
            self.project_manager = ProjectManager(self.config)
            self.template_manager = TemplateManager()
            self.build_manager = BuildManager()
            self.dependency_manager = DependencyManager()
            self.test_manager = TestManager()
            self.workflow_manager = WorkflowManager()
            self.input_handler = InputRequestHandler()
            
            # Setup request handlers
            self._setup_resource_handlers()
            self._setup_tool_handlers()
            self._setup_prompt_handlers()
            
            logger.info("Server initialization completed successfully")
            
        except Exception as e:
            logger.error(f"Failed to initialize server: {e}")
            raise

    def _setup_resource_handlers(self):
        """Set up resource request handlers."""
        @self.server.list_resources()
        async def list_resources() -> list[types.Resource]:
            """List available resources."""
            logger.debug("Listing resources")
            return []

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read resource content."""
            logger.debug(f"Reading resource: {uri}")
            return ""

    def _setup_tool_handlers(self):
        """Set up tool request handlers."""
        @self.server.list_tools()
        async def list_tools() -> list[types.Tool]:
            """List available tools."""
            logger.debug("Listing tools")
            return []

        @self.server.call_tool()
        async def call_tool(name: str, arguments: Dict[str, Any]) -> Sequence[types.TextContent]:
            """Execute a tool."""
            logger.debug(f"Calling tool {name} with arguments {arguments}")
            return [types.TextContent(type="text", text="Tool execution result")]

    def _setup_prompt_handlers(self):
        """Set up prompt request handlers."""
        @self.server.list_prompts()
        async def list_prompts() -> list[types.Prompt]:
            """List available prompts."""
            logger.debug("Listing prompts")
            return []

    async def run(self):
        """Run the MCP Development Server."""
        try:
            logger.info(f"Starting {self.server.name}...")
            
            # Use stdio transport
            async with stdio_server() as streams:
                logger.info("Using stdio transport")
                await self.server.run(
                    streams[0],  # read stream
                    streams[1],  # write stream
                    self.server.create_initialization_options(),
                    raise_exceptions=True  # Enable for debugging
                )
                
        except Exception as e:
            logger.error(f"Server error: {str(e)}")
            raise MCPDevServerError(f"Server error: {str(e)}")

        finally:
            logger.info("Server shutdown")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/templates.py:
--------------------------------------------------------------------------------

```python
"""Input request templates for common scenarios."""
from typing import Dict
from .input_protocol import InputRequest, InputField

ENVIRONMENT_SETUP = InputRequest(
    request_id="environment_setup",
    title="Setup Development Environment",
    description="Configure your development environment",
    fields=[
        InputField(
            name="language",
            type="select",
            description="Primary programming language",
            options=[
                {"value": "python", "label": "Python"},
                {"value": "node", "label": "Node.js"},
                {"value": "both", "label": "Python & Node.js"}
            ]
        ),
        InputField(
            name="python_version",
            type="select",
            description="Python version",
            options=[
                {"value": "3.12", "label": "Python 3.12"},
                {"value": "3.11", "label": "Python 3.11"},
                {"value": "3.10", "label": "Python 3.10"}
            ],
            required=False
        ),
        InputField(
            name="node_version",
            type="select",
            description="Node.js version",
            options=[
                {"value": "20", "label": "Node.js 20 LTS"},
                {"value": "18", "label": "Node.js 18 LTS"}
            ],
            required=False
        ),
        InputField(
            name="include_docker",
            type="confirm",
            description="Include Docker support?",
            default=False
        )
    ]
)

TEST_CONFIGURATION = InputRequest(
    request_id="test_configuration",
    title="Configure Test Environment",
    description="Set up testing parameters",
    fields=[
        InputField(
            name="test_framework",
            type="select",
            description="Testing framework",
            options=[
                {"value": "pytest", "label": "pytest"},
                {"value": "unittest", "label": "unittest"},
                {"value": "jest", "label": "Jest"},
                {"value": "mocha", "label": "Mocha"}
            ]
        ),
        InputField(
            name="include_coverage",
            type="confirm",
            description="Include coverage reporting?",
            default=True
        ),
        InputField(
            name="parallel",
            type="confirm",
            description="Run tests in parallel?",
            default=False
        ),
        InputField(
            name="test_path",
            type="text",
            description="Test directory or file pattern",
            default="tests/",
            required=False
        )
    ]
)

DEPLOYMENT_CONFIG = InputRequest(
    request_id="deployment_config",
    title="Configure Deployment",
    description="Set up deployment parameters",
    fields=[
        InputField(
            name="environment",
            type="select",
            description="Deployment environment",
            options=[
                {"value": "development", "label": "Development"},
                {"value": "staging", "label": "Staging"},
                {"value": "production", "label": "Production"}
            ]
        ),
        InputField(
            name="deploy_method",
            type="select",
            description="Deployment method",
            options=[
                {"value": "docker", "label": "Docker Container"},
                {"value": "kubernetes", "label": "Kubernetes"},
                {"value": "serverless", "label": "Serverless"}
            ]
        ),
        InputField(
            name="auto_deploy",
            type="confirm",
            description="Enable automatic deployment?",
            default=False
        ),
        InputField(
            name="rollback_enabled",
            type="confirm",
            description="Enable automatic rollback?",
            default=True
        )
    ]
)

DEBUG_CONFIG = InputRequest(
    request_id="debug_config",
    title="Configure Debugging Session",
    description="Set up debugging parameters",
    fields=[
        InputField(
            name="debug_type",
            type="select",
            description="Type of debugging",
            options=[
                {"value": "python", "label": "Python Debugger"},
                {"value": "node", "label": "Node.js Debugger"},
                {"value": "remote", "label": "Remote Debugging"}
            ]
        ),
        InputField(
            name="port",
            type="number",
            description="Debug port",
            default=9229,
            validation={"min": 1024, "max": 65535}
        ),
        InputField(
            name="break_on_entry",
            type="confirm",
            description="Break on entry point?",
            default=True
        )
    ]
)

TEMPLATE_REQUESTS: Dict[str, InputRequest] = {
    "environment_setup": ENVIRONMENT_SETUP,
    "test_configuration": TEST_CONFIGURATION,
    "deployment_config": DEPLOYMENT_CONFIG,
    "debug_config": DEBUG_CONFIG
}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/project_types.py:
--------------------------------------------------------------------------------

```python
"""Project type definitions and configurations."""
from typing import Dict, Any, List
from enum import Enum

class BuildSystem(str, Enum):
    """Build system types."""
    MAVEN = "maven"
    GRADLE = "gradle"
    NPM = "npm"
    YARN = "yarn"
    PIP = "pip"
    POETRY = "poetry"
    DOTNET = "dotnet"
    CARGO = "cargo"
    GO = "go"
    SBT = "sbt"

class ProjectType:
    """Base project type configuration."""
    
    def __init__(
        self,
        name: str,
        description: str,
        file_structure: Dict[str, Any],
        build_systems: List[BuildSystem],
        default_build_system: BuildSystem,
        config_files: List[str],
        environment_variables: Dict[str, str],
        docker_templates: List[str],
        input_templates: List[str]
    ):
        self.name = name
        self.description = description
        self.file_structure = file_structure
        self.build_systems = build_systems
        self.default_build_system = default_build_system
        self.config_files = config_files
        self.environment_variables = environment_variables
        self.docker_templates = docker_templates
        self.input_templates = input_templates

# Define standard project types
JAVA_PROJECT = ProjectType(
    name="java",
    description="Java project",
    file_structure={
        "src/": {
            "main/": {
                "java/": {},
                "resources/": {}
            },
            "test/": {
                "java/": {},
                "resources/": {}
            }
        },
        "target/": {},
    },
    build_systems=[BuildSystem.MAVEN, BuildSystem.GRADLE],
    default_build_system=BuildSystem.MAVEN,
    config_files=["pom.xml", "build.gradle", ".gitignore", "README.md"],
    environment_variables={
        "JAVA_HOME": "",
        "MAVEN_HOME": "",
        "GRADLE_HOME": ""
    },
    docker_templates=["java-maven", "java-gradle"],
    input_templates=["java_config", "maven_config", "gradle_config"]
)

DOTNET_PROJECT = ProjectType(
    name="dotnet",
    description=".NET project",
    file_structure={
        "src/": {},
        "tests/": {},
        "docs/": {}
    },
    build_systems=[BuildSystem.DOTNET],
    default_build_system=BuildSystem.DOTNET,
    config_files=[".csproj", ".sln", "global.json", ".gitignore", "README.md"],
    environment_variables={
        "DOTNET_ROOT": "",
        "ASPNETCORE_ENVIRONMENT": "Development"
    },
    docker_templates=["dotnet-sdk", "dotnet-runtime"],
    input_templates=["dotnet_config", "aspnet_config"]
)

NODE_PROJECT = ProjectType(
    name="node",
    description="Node.js project",
    file_structure={
        "src/": {},
        "tests/": {},
        "dist/": {},
        "public/": {}
    },
    build_systems=[BuildSystem.NPM, BuildSystem.YARN],
    default_build_system=BuildSystem.NPM,
    config_files=["package.json", "tsconfig.json", ".gitignore", "README.md"],
    environment_variables={
        "NODE_ENV": "development",
        "NPM_TOKEN": ""
    },
    docker_templates=["node-dev", "node-prod"],
    input_templates=["node_config", "npm_config", "typescript_config"]
)

PYTHON_PROJECT = ProjectType(
    name="python",
    description="Python project",
    file_structure={
        "src/": {},
        "tests/": {},
        "docs/": {},
        "notebooks/": {}
    },
    build_systems=[BuildSystem.PIP, BuildSystem.POETRY],
    default_build_system=BuildSystem.POETRY,
    config_files=["pyproject.toml", "setup.py", "requirements.txt", ".gitignore", "README.md"],
    environment_variables={
        "PYTHONPATH": "src",
        "PYTHON_ENV": "development"
    },
    docker_templates=["python-dev", "python-prod"],
    input_templates=["python_config", "poetry_config", "pytest_config"]
)

GOLANG_PROJECT = ProjectType(
    name="golang",
    description="Go project",
    file_structure={
        "cmd/": {},
        "internal/": {},
        "pkg/": {},
        "api/": {}
    },
    build_systems=[BuildSystem.GO],
    default_build_system=BuildSystem.GO,
    config_files=["go.mod", "go.sum", ".gitignore", "README.md"],
    environment_variables={
        "GOPATH": "",
        "GO111MODULE": "on"
    },
    docker_templates=["golang-dev", "golang-prod"],
    input_templates=["golang_config", "go_mod_config"]
)

RUST_PROJECT = ProjectType(
    name="rust",
    description="Rust project",
    file_structure={
        "src/": {},
        "tests/": {},
        "benches/": {},
        "examples/": {}
    },
    build_systems=[BuildSystem.CARGO],
    default_build_system=BuildSystem.CARGO,
    config_files=["Cargo.toml", "Cargo.lock", ".gitignore", "README.md"],
    environment_variables={
        "RUST_BACKTRACE": "1",
        "CARGO_HOME": ""
    },
    docker_templates=["rust-dev", "rust-prod"],
    input_templates=["rust_config", "cargo_config"]
)

# Map of all available project types
PROJECT_TYPES: Dict[str, ProjectType] = {
    "java": JAVA_PROJECT,
    "dotnet": DOTNET_PROJECT,
    "node": NODE_PROJECT,
    "python": PYTHON_PROJECT,
    "golang": GOLANG_PROJECT,
    "rust": RUST_PROJECT
}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/git.py:
--------------------------------------------------------------------------------

```python
"""Git integration for MCP Development Server."""
import os
from typing import List, Optional
from git import Repo, GitCommandError
from git.objects import Commit

from ..utils.logging import setup_logging
from ..utils.errors import GitError

logger = setup_logging(__name__)

class GitManager:
    """Manages Git operations for a project."""
    
    def __init__(self, project_path: str):
        self.project_path = project_path
        self.repo: Optional[Repo] = None
        
    async def initialize(self) -> None:
        """Initialize Git repository."""
        try:
            self.repo = Repo.init(self.project_path)
            
            # Create default .gitignore if it doesn't exist
            gitignore_path = os.path.join(self.project_path, '.gitignore')
            if not os.path.exists(gitignore_path):
                with open(gitignore_path, 'w') as f:
                    f.write('\n'.join([
                        '# Python',
                        '__pycache__/',
                        '*.pyc',
                        '*.pyo',
                        '*.pyd',
                        '.Python',
                        'env/',
                        'venv/',
                        '.env',
                        '.venv',
                        '',
                        '# IDE',
                        '.idea/',
                        '.vscode/',
                        '*.swp',
                        '*.swo',
                        '',
                        '# Project specific',
                        '.mcp/',
                        'dist/',
                        'build/',
                        '*.egg-info/',
                        ''
                    ]))
                
            # Initial commit
            if not self.repo.heads:
                self.repo.index.add(['.gitignore'])
                self.repo.index.commit("Initial commit")
                
            logger.info(f"Initialized Git repository at {self.project_path}")
            
        except Exception as e:
            raise GitError(f"Git initialization failed: {str(e)}")
            
    async def get_status(self) -> dict:
        """Get repository status."""
        try:
            if not self.repo:
                raise GitError("Git repository not initialized")
                
            return {
                "branch": self.repo.active_branch.name,
                "changed_files": [item.a_path for item in self.repo.index.diff(None)],
                "untracked_files": self.repo.untracked_files,
                "is_dirty": self.repo.is_dirty(),
                "head_commit": {
                    "hash": self.repo.head.commit.hexsha,
                    "message": self.repo.head.commit.message,
                    "author": str(self.repo.head.commit.author),
                    "date": str(self.repo.head.commit.authored_datetime)
                }
            }
            
        except Exception as e:
            raise GitError(f"Failed to get Git status: {str(e)}")
            
    async def commit(self, message: str, files: Optional[List[str]] = None) -> str:
        """Create a new commit."""
        try:
            if not self.repo:
                raise GitError("Git repository not initialized")
                
            # Add specified files or all changes
            if files:
                self.repo.index.add(files)
            else:
                self.repo.index.add('.')
                
            # Create commit
            commit = self.repo.index.commit(message)
            logger.info(f"Created commit: {commit.hexsha}")
            
            return commit.hexsha
            
        except Exception as e:
            raise GitError(f"Failed to create commit: {str(e)}")
            
    async def get_commit_history(
        self,
        max_count: Optional[int] = None
    ) -> List[dict]:
        """Get commit history."""
        try:
            if not self.repo:
                raise GitError("Git repository not initialized")
                
            commits = []
            for commit in self.repo.iter_commits(max_count=max_count):
                commits.append({
                    "hash": commit.hexsha,
                    "message": commit.message,
                    "author": str(commit.author),
                    "date": str(commit.authored_datetime),
                    "files": list(commit.stats.files.keys())
                })
                
            return commits
            
        except Exception as e:
            raise GitError(f"Failed to get commit history: {str(e)}")
            
    async def create_branch(self, name: str) -> None:
        """Create a new branch."""
        try:
            if not self.repo:
                raise GitError("Git repository not initialized")
                
            self.repo.create_head(name)
            logger.info(f"Created branch: {name}")
            
        except Exception as e:
            raise GitError(f"Failed to create branch: {str(e)}")
            
    async def checkout(self, branch: str) -> None:
        """Checkout a branch."""
        try:
            if not self.repo:
                raise GitError("Git repository not initialized")
                
            self.repo.git.checkout(branch)
            logger.info(f"Checked out branch: {branch}")
            
        except Exception as e:
            raise GitError(f"Failed to checkout branch: {str(e)}")
            
    async def get_diff(
        self,
        commit_a: Optional[str] = None,
        commit_b: Optional[str] = None
    ) -> str:
        """Get diff between commits or working directory."""
        try:
            if not self.repo:
                raise GitError("Git repository not initialized")
                
            return self.repo.git.diff(commit_a, commit_b)
            
        except Exception as e:
            raise GitError(f"Failed to get diff: {str(e)}")
            
    async def cleanup(self) -> None:
        """Clean up Git resources."""
        self.repo = None
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/environments/manager.py:
--------------------------------------------------------------------------------

```python
"""Environment management for MCP Development Server."""
import os
import json
from typing import Dict, List, Optional, Any
from pathlib import Path

from ..docker.manager import DockerManager
from ..docker.volumes import VolumeManager
from ..docker.templates import DockerTemplates
from ..utils.logging import setup_logging
from ..utils.errors import EnvironmentError

logger = setup_logging(__name__)

class EnvironmentManager:
    """Manages development environments."""
    
    def __init__(self):
        self.docker_manager = DockerManager()
        self.volume_manager = VolumeManager()
        self.environments: Dict[str, Dict[str, Any]] = {}
        
    async def create_environment(
        self,
        name: str,
        project_path: str,
        env_type: str,
        config: Optional[Dict[str, Any]] = None
    ) -> str:
        """Create a new development environment."""
        try:
            config = config or {}
            
            # Create environment directory
            env_path = os.path.join(project_path, '.mcp', 'environments', name)
            os.makedirs(env_path, exist_ok=True)
            
            # Generate Dockerfile
            dockerfile_content = DockerTemplates.get_template(env_type, config)
            dockerfile_path = os.path.join(env_path, 'Dockerfile')
            with open(dockerfile_path, 'w') as f:
                f.write(dockerfile_content)
            
            # Create volumes for persistence
            volumes = {}
            for volume_name in ['src', 'deps', 'cache']:
                volume = await self.volume_manager.create_volume(
                    f"mcp-{name}-{volume_name}",
                    labels={
                        'mcp.environment': name,
                        'mcp.volume.type': volume_name
                    }
                )
                volumes[volume] = {'bind': f'/app/{volume_name}', 'mode': 'rw'}
            
            # Create container
            container_id = await self.docker_manager.create_container(
                project_path=project_path,
                environment=name,
                dockerfile=dockerfile_path,
                volumes=volumes,
                environment_vars=config.get('env_vars'),
                ports=config.get('ports')
            )
            
            # Store environment configuration
            self.environments[name] = {
                'id': container_id,
                'type': env_type,
                'path': env_path,
                'config': config,
                'volumes': volumes
            }
            
            # Save environment metadata
            self._save_environment_metadata(name)
            
            logger.info(f"Created environment: {name}")
            return container_id
            
        except Exception as e:
            raise EnvironmentError(f"Failed to create environment: {str(e)}")
            
    async def remove_environment(self, name: str) -> None:
        """Remove a development environment."""
        try:
            if env := self.environments.get(name):
                # Stop container
                await self.docker_manager.stop_container(name)
                
                # Remove volumes
                for volume in env['volumes']:
                    await self.volume_manager.remove_volume(volume)
                
                # Remove environment directory
                import shutil
                shutil.rmtree(env['path'])
                
                # Remove from environments dict
                del self.environments[name]
                
                logger.info(f"Removed environment: {name}")
            else:
                raise EnvironmentError(f"Environment not found: {name}")
                
        except Exception as e:
            raise EnvironmentError(f"Failed to remove environment: {str(e)}")
            
    async def execute_in_environment(
        self,
        name: str,
        command: str,
        workdir: Optional[str] = None
    ) -> Dict[str, Any]:
        """Execute a command in an environment."""
        try:
            if name not in self.environments:
                raise EnvironmentError(f"Environment not found: {name}")
                
            return await self.docker_manager.execute_command(
                environment=name,
                command=command,
                workdir=workdir
            )
            
        except Exception as e:
            raise EnvironmentError(f"Failed to execute command: {str(e)}")
            
    async def get_environment_status(self, name: str) -> Dict[str, Any]:
        """Get environment status including container and volumes."""
        try:
            if env := self.environments.get(name):
                container_status = await self.docker_manager.get_container_status(name)
                
                volumes_status = {}
                for volume in env['volumes']:
                    volumes_status[volume] = await self.volume_manager.get_volume_info(volume)
                
                return {
                    'container': container_status,
                    'volumes': volumes_status,
                    'type': env['type'],
                    'config': env['config']
                }
            else:
                raise EnvironmentError(f"Environment not found: {name}")
                
        except Exception as e:
            raise EnvironmentError(f"Failed to get environment status: {str(e)}")
            
    def _save_environment_metadata(self, name: str) -> None:
        """Save environment metadata to disk."""
        if env := self.environments.get(name):
            metadata_path = os.path.join(env['path'], 'metadata.json')
            with open(metadata_path, 'w') as f:
                json.dump({
                    'name': name,
                    'type': env['type'],
                    'config': env['config'],
                    'volumes': list(env['volumes'].keys())
                }, f, indent=2)
                
    async def cleanup(self) -> None:
        """Clean up all environments."""
        for name in list(self.environments.keys()):
            try:
                await self.remove_environment(name)
            except Exception as e:
                logger.error(f"Error cleaning up environment {name}: {str(e)}")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/prompts/project_templates.py:
--------------------------------------------------------------------------------

```python
"""Project-specific input templates."""
from typing import Dict
from .input_protocol import InputRequest, InputField

# Java Project Templates
JAVA_CONFIG = InputRequest(
    request_id="java_config",
    title="Java Project Configuration",
    description="Configure Java project settings",
    fields=[
        InputField(
            name="java_version",
            type="select",
            description="Java version",
            options=[
                {"value": "21", "label": "Java 21 (LTS)"},
                {"value": "17", "label": "Java 17 (LTS)"},
                {"value": "11", "label": "Java 11 (LTS)"},
                {"value": "8", "label": "Java 8"}
            ]
        ),
        InputField(
            name="project_type",
            type="select",
            description="Project type",
            options=[
                {"value": "spring-boot", "label": "Spring Boot"},
                {"value": "jakarta-ee", "label": "Jakarta EE"},
                {"value": "android", "label": "Android"},
                {"value": "library", "label": "Java Library"}
            ]
        ),
        InputField(
            name="packaging",
            type="select",
            description="Packaging type",
            options=[
                {"value": "jar", "label": "JAR"},
                {"value": "war", "label": "WAR"},
                {"value": "ear", "label": "EAR"}
            ]
        )
    ]
)

# .NET Project Templates
DOTNET_CONFIG = InputRequest(
    request_id="dotnet_config",
    title=".NET Project Configuration",
    description="Configure .NET project settings",
    fields=[
        InputField(
            name="dotnet_version",
            type="select",
            description=".NET version",
            options=[
                {"value": "8.0", "label": ".NET 8.0"},
                {"value": "7.0", "label": ".NET 7.0"},
                {"value": "6.0", "label": ".NET 6.0 (LTS)"}
            ]
        ),
        InputField(
            name="project_type",
            type="select",
            description="Project type",
            options=[
                {"value": "webapi", "label": "ASP.NET Core Web API"},
                {"value": "mvc", "label": "ASP.NET Core MVC"},
                {"value": "blazor", "label": "Blazor"},
                {"value": "maui", "label": ".NET MAUI"},
                {"value": "library", "label": "Class Library"}
            ]
        ),
        InputField(
            name="authentication",
            type="select",
            description="Authentication type",
            options=[
                {"value": "none", "label": "None"},
                {"value": "individual", "label": "Individual Accounts"},
                {"value": "microsoft", "label": "Microsoft Identity Platform"},
                {"value": "windows", "label": "Windows Authentication"}
            ]
        )
    ]
)

# Node.js Project Templates
NODE_CONFIG = InputRequest(
    request_id="node_config",
    title="Node.js Project Configuration",
    description="Configure Node.js project settings",
    fields=[
        InputField(
            name="node_version",
            type="select",
            description="Node.js version",
            options=[
                {"value": "20", "label": "Node.js 20 (LTS)"},
                {"value": "18", "label": "Node.js 18 (LTS)"}
            ]
        ),
        InputField(
            name="project_type",
            type="select",
            description="Project type",
            options=[
                {"value": "express", "label": "Express.js"},
                {"value": "next", "label": "Next.js"},
                {"value": "nest", "label": "NestJS"},
                {"value": "library", "label": "NPM Package"}
            ]
        ),
        InputField(
            name="typescript",
            type="confirm",
            description="Use TypeScript?",
            default=True
        )
    ]
)

# Python Project Templates
PYTHON_CONFIG = InputRequest(
    request_id="python_config",
    title="Python Project Configuration",
    description="Configure Python project settings",
    fields=[
        InputField(
            name="python_version",
            type="select",
            description="Python version",
            options=[
                {"value": "3.12", "label": "Python 3.12"},
                {"value": "3.11", "label": "Python 3.11"},
                {"value": "3.10", "label": "Python 3.10"}
            ]
        ),
        InputField(
            name="project_type",
            type="select",
            description="Project type",
            options=[
                {"value": "fastapi", "label": "FastAPI"},
                {"value": "django", "label": "Django"},
                {"value": "flask", "label": "Flask"},
                {"value": "library", "label": "Python Package"}
            ]
        ),
        InputField(
            name="dependency_management",
            type="select",
            description="Dependency management",
            options=[
                {"value": "poetry", "label": "Poetry"},
                {"value": "pip", "label": "pip + requirements.txt"},
                {"value": "pipenv", "label": "Pipenv"}
            ]
        )
    ]
)

# Golang Project Templates
GOLANG_CONFIG = InputRequest(
    request_id="golang_config",
    title="Go Project Configuration",
    description="Configure Go project settings",
    fields=[
        InputField(
            name="go_version",
            type="select",
            description="Go version",
            options=[
                {"value": "1.22", "label": "Go 1.22"},
                {"value": "1.21", "label": "Go 1.21"},
                {"value": "1.20", "label": "Go 1.20"}
            ]
        ),
        InputField(
            name="project_type",
            type="select",
            description="Project type",
            options=[
                {"value": "gin", "label": "Gin Web Framework"},
                {"value": "echo", "label": "Echo Framework"},
                {"value": "cli", "label": "CLI Application"},
                {"value": "library", "label": "Go Module"}
            ]
        ),
        InputField(
            name="module_path",
            type="text",
            description="Module path (e.g., github.com/user/repo)",
            validation={"pattern": r"^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+(/[a-zA-Z0-9_.-]+)?$"}
        )
    ]
)

# All project templates
PROJECT_TEMPLATES: Dict[str, InputRequest] = {
    "java_config": JAVA_CONFIG,
    "dotnet_config": DOTNET_CONFIG,
    "node_config": NODE_CONFIG,
    "python_config": PYTHON_CONFIG,
    "golang_config": GOLANG_CONFIG
}
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/templates.py:
--------------------------------------------------------------------------------

```python
"""Template system for project creation."""
import os
import shutil
from pathlib import Path
from typing import Dict, Any, List
import jinja2
import yaml

from ..utils.logging import setup_logging
from ..utils.errors import ProjectError

logger = setup_logging(__name__)

class TemplateManager:
    """Manages project templates."""
    
    def __init__(self):
        """Initialize template manager."""
        self.template_dir = self._get_template_dir()
        self.env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(str(self.template_dir)),
            autoescape=jinja2.select_autoescape()
        )
        
    def _get_template_dir(self) -> Path:
        """Get templates directory path."""
        if os.name == "nt":  # Windows
            template_dir = Path(os.getenv("APPDATA")) / "Claude" / "templates"
        else:  # macOS/Linux
            template_dir = Path.home() / ".config" / "claude" / "templates"
            
        template_dir.mkdir(parents=True, exist_ok=True)
        
        # Initialize with basic template if empty
        if not any(template_dir.iterdir()):
            self._initialize_basic_template(template_dir)
            
        return template_dir
        
    def _initialize_basic_template(self, template_dir: Path):
        """Initialize basic project template.
        
        Args:
            template_dir: Templates directory path
        """
        basic_dir = template_dir / "basic"
        basic_dir.mkdir(exist_ok=True)
        
        # Create template configuration
        config = {
            "name": "basic",
            "description": "Basic project template",
            "version": "1.0.0",
            "files": [
                "README.md",
                "requirements.txt",
                ".gitignore",
                "src/__init__.py",
                "tests/__init__.py"
            ],
            "variables": {
                "project_name": "",
                "description": ""
            },
            "features": {
                "git": True,
                "tests": True,
                "docker": False
            }
        }
        
        with open(basic_dir / "template.yaml", "w") as f:
            yaml.dump(config, f)
            
        # Create template files
        readme_content = """# {{ project_name }}

{{ description }}

## Installation

```bash
pip install -r requirements.txt
```

## Usage

```python
from {{ project_name.lower() }} import main
```

## Testing

```bash
pytest tests/
```
"""
        
        with open(basic_dir / "README.md", "w") as f:
            f.write(readme_content)
            
        # Create source directory
        src_dir = basic_dir / "src"
        src_dir.mkdir(exist_ok=True)
        
        with open(src_dir / "__init__.py", "w") as f:
            f.write('"""{{ project_name }} package."""\n')
            
        # Create tests directory
        tests_dir = basic_dir / "tests"
        tests_dir.mkdir(exist_ok=True)
        
        with open(tests_dir / "__init__.py", "w") as f:
            f.write('"""Tests for {{ project_name }}."""\n')
            
        # Create requirements.txt
        with open(basic_dir / "requirements.txt", "w") as f:
            f.write("pytest>=7.0.0\n")
            
        # Create .gitignore
        gitignore_content = """__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
"""
        
        with open(basic_dir / ".gitignore", "w") as f:
            f.write(gitignore_content)
            
    async def apply_template(self, template_name: str, project: Any) -> None:
        """Apply template to project.
        
        Args:
            template_name: Name of template to apply
            project: Project instance
        """
        try:
            template_path = self.template_dir / template_name
            if not template_path.exists():
                raise ProjectError(f"Template not found: {template_name}")
                
            # Load template configuration
            with open(template_path / "template.yaml", "r") as f:
                template_config = yaml.safe_load(f)
                
            # Prepare template variables
            variables = {
                "project_name": project.config.name,
                "description": project.config.description
            }
            
            # Process each template file
            for file_path in template_config["files"]:
                template_file = template_path / file_path
                if template_file.exists():
                    # Create target directory if needed
                    target_path = Path(project.path) / file_path
                    target_path.parent.mkdir(parents=True, exist_ok=True)
                    
                    # Render template content
                    template = self.env.get_template(f"{template_name}/{file_path}")
                    content = template.render(**variables)
                    
                    # Write rendered content
                    with open(target_path, "w") as f:
                        f.write(content)
                        
            logger.info(f"Applied template {template_name} to project {project.config.name}")
            
        except Exception as e:
            logger.error(f"Failed to apply template: {str(e)}")
            raise ProjectError(f"Template application failed: {str(e)}")
            
    async def template_has_git(self, template_name: str) -> bool:
        """Check if template includes Git initialization.
        
        Args:
            template_name: Template name
            
        Returns:
            bool: True if template includes Git
        """
        try:
            template_path = self.template_dir / template_name
            if not template_path.exists():
                return False
                
            # Load template configuration
            with open(template_path / "template.yaml", "r") as f:
                template_config = yaml.safe_load(f)
                
            return template_config.get("features", {}).get("git", False)
            
        except Exception:
            return False
            
    def list_templates(self) -> List[Dict[str, Any]]:
        """Get list of available templates.
        
        Returns:
            List[Dict[str, Any]]: Template information
        """
        templates = []
        
        for template_dir in self.template_dir.iterdir():
            if template_dir.is_dir():
                config_path = template_dir / "template.yaml"
                if config_path.exists():
                    with open(config_path, "r") as f:
                        config = yaml.safe_load(f)
                        templates.append(config)
                        
        return templates
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/context.py:
--------------------------------------------------------------------------------

```python
"""Project context management for MCP Development Server."""
import os
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any

from pydantic import BaseModel
from ..utils.config import ProjectConfig
from ..utils.logging import setup_logging
from ..utils.errors import ProjectError, FileOperationError

logger = setup_logging(__name__)

class ProjectState(BaseModel):
    """Project state tracking."""
    initialized: bool = False
    last_build_time: Optional[datetime] = None
    last_build_status: Optional[str] = None
    last_test_time: Optional[datetime] = None
    last_test_status: Optional[str] = None
    git_initialized: bool = False

class ProjectContext:
    """Manages the context and state of a development project."""
    
    def __init__(self, config: ProjectConfig):
        self.id = str(uuid.uuid4())
        self.config = config
        self.path = config.path
        self.state = ProjectState()
        self._file_watchers: Dict[str, Any] = {}
        
    async def initialize(self) -> None:
        """Initialize project structure and state."""
        try:
            # Create project directory
            os.makedirs(self.path, exist_ok=True)
            
            # Create project structure
            await self._create_project_structure()
            
            # Initialize state file
            await self._init_state_file()
            
            # Set up file watchers
            await self._setup_file_watchers()
            
            self.state.initialized = True
            logger.info(f"Initialized project {self.config.name} at {self.path}")
            
        except Exception as e:
            raise ProjectError(f"Project initialization failed: {str(e)}")
            
    async def _create_project_structure(self) -> None:
        """Create initial project directory structure."""
        try:
            # Create standard directories
            for dir_name in ['.mcp', 'src', 'tests', 'docs']:
                os.makedirs(os.path.join(self.path, dir_name), exist_ok=True)
                
            # Create basic configuration files
            config_path = os.path.join(self.path, '.mcp', 'project.json')
            with open(config_path, 'w') as f:
                json.dump(self.config.dict(), f, indent=2, default=str)
                
        except Exception as e:
            raise FileOperationError(f"Failed to create project structure: {str(e)}")
            
    async def _init_state_file(self) -> None:
        """Initialize project state file."""
        try:
            state_path = os.path.join(self.path, '.mcp', 'state.json')
            with open(state_path, 'w') as f:
                json.dump(self.state.dict(), f, indent=2, default=str)
                
        except Exception as e:
            raise FileOperationError(f"Failed to initialize state file: {str(e)}")
            
    async def _setup_file_watchers(self) -> None:
        """Set up file system watchers for project directories."""
        # To be implemented with file watching functionality
        pass
        
    def get_structure(self) -> Dict[str, Any]:
        """Get project structure as a dictionary."""
        structure = {"name": self.config.name, "type": "directory", "children": []}
        
        def scan_directory(path: Path, current_dict: Dict[str, Any]) -> None:
            try:
                for item in path.iterdir():
                    # Skip hidden files and .mcp directory
                    if item.name.startswith('.'):
                        continue
                        
                    if item.is_file():
                        current_dict["children"].append({
                            "name": item.name,
                            "type": "file",
                            "size": item.stat().st_size
                        })
                    elif item.is_dir():
                        dir_dict = {
                            "name": item.name,
                            "type": "directory",
                            "children": []
                        }
                        current_dict["children"].append(dir_dict)
                        scan_directory(item, dir_dict)
                        
            except Exception as e:
                logger.error(f"Error scanning directory {path}: {str(e)}")
                
        scan_directory(Path(self.path), structure)
        return structure
        
    def get_file_content(self, relative_path: str) -> str:
        """Get content of a project file."""
        try:
            file_path = os.path.join(self.path, relative_path)
            if not os.path.exists(file_path):
                raise FileOperationError(f"File not found: {relative_path}")
                
            # Basic security check
            if not os.path.normpath(file_path).startswith(str(self.path)):
                raise FileOperationError("Invalid file path")
                
            with open(file_path, 'r') as f:
                return f.read()
                
        except Exception as e:
            raise FileOperationError(f"Failed to read file {relative_path}: {str(e)}")
            
    async def update_file(self, relative_path: str, content: str) -> None:
        """Update content of a project file."""
        try:
            file_path = os.path.join(self.path, relative_path)
            
            # Create directories if needed
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            
            # Security check
            if not os.path.normpath(file_path).startswith(str(self.path)):
                raise FileOperationError("Invalid file path")
                
            with open(file_path, 'w') as f:
                f.write(content)
                
            logger.info(f"Updated file: {relative_path}")
            
        except Exception as e:
            raise FileOperationError(f"Failed to update file {relative_path}: {str(e)}")
            
    async def delete_file(self, relative_path: str) -> None:
        """Delete a project file."""
        try:
            file_path = os.path.join(self.path, relative_path)
            
            # Security check
            if not os.path.normpath(file_path).startswith(str(self.path)):
                raise FileOperationError("Invalid file path")
                
            if os.path.exists(file_path):
                os.remove(file_path)
                logger.info(f"Deleted file: {relative_path}")
            else:
                logger.warning(f"File not found: {relative_path}")
                
        except Exception as e:
            raise FileOperationError(f"Failed to delete file {relative_path}: {str(e)}")
            
    async def update_state(self, **kwargs) -> None:
        """Update project state."""
        try:
            # Update state object
            for key, value in kwargs.items():
                if hasattr(self.state, key):
                    setattr(self.state, key, value)
                    
            # Save to state file
            state_path = os.path.join(self.path, '.mcp', 'state.json')
            with open(state_path, 'w') as f:
                json.dump(self.state.dict(), f, indent=2, default=str)
                
            logger.info(f"Updated project state: {kwargs}")
            
        except Exception as e:
            raise ProjectError(f"Failed to update project state: {str(e)}")
            
    async def cleanup(self) -> None:
        """Clean up project resources."""
        try:
            # Stop file watchers
            for watcher in self._file_watchers.values():
                await watcher.stop()
                
            logger.info(f"Cleaned up project resources for {self.config.name}")
            
        except Exception as e:
            logger.error(f"Error during project cleanup: {str(e)}")
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/manager.py:
--------------------------------------------------------------------------------

```python
"""Docker integration for MCP Development Server."""
import asyncio
import docker
from typing import Dict, Any, Optional, List
from pathlib import Path
import tempfile
import yaml
import jinja2

from ..utils.logging import setup_logging
from ..utils.errors import MCPDevServerError

logger = setup_logging(__name__)

class DockerManager:
    """Manages Docker containers and environments."""
    
    def __init__(self):
        """Initialize Docker manager."""
        self.client = docker.from_env()
        self.active_containers: Dict[str, Any] = {}
        self._setup_template_environment()
        
    def _setup_template_environment(self):
        """Set up Jinja2 template environment."""
        template_dir = Path(__file__).parent / "templates"
        self.template_env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(str(template_dir)),
            autoescape=jinja2.select_autoescape()
        )
        
    async def create_environment(
        self,
        name: str,
        image: str,
        project_path: str,
        env_vars: Optional[Dict[str, str]] = None,
        ports: Optional[Dict[str, str]] = None,
        volumes: Optional[Dict[str, Dict[str, str]]] = None
    ) -> str:
        """Create a new Docker environment.
        
        Args:
            name: Environment name
            image: Docker image name
            project_path: Project directory path
            env_vars: Environment variables
            ports: Port mappings
            volumes: Additional volume mappings
            
        Returns:
            str: Environment ID
        """
        try:
            # Ensure image is available
            try:
                self.client.images.get(image)
            except docker.errors.ImageNotFound:
                logger.info(f"Pulling image: {image}")
                self.client.images.pull(image)
                
            # Setup default volumes
            container_volumes = {
                project_path: {
                    "bind": "/workspace",
                    "mode": "rw"
                }
            }
            if volumes:
                container_volumes.update(volumes)
                
            # Create container
            container = self.client.containers.run(
                image=image,
                name=f"mcp-env-{name}",
                detach=True,
                volumes=container_volumes,
                environment=env_vars or {},
                ports=ports or {},
                working_dir="/workspace",
                remove=True
            )
            
            env_id = container.id
            self.active_containers[env_id] = {
                "name": name,
                "container": container,
                "status": "running"
            }
            
            logger.info(f"Created environment: {name} ({env_id})")
            return env_id
            
        except Exception as e:
            logger.error(f"Failed to create environment: {str(e)}")
            raise MCPDevServerError(f"Environment creation failed: {str(e)}")
            
    async def generate_dockerfile(
        self,
        template: str,
        variables: Dict[str, Any],
        output_path: Optional[str] = None
    ) -> str:
        """Generate Dockerfile from template.
        
        Args:
            template: Template name
            variables: Template variables
            output_path: Optional path to save Dockerfile
            
        Returns:
            str: Generated Dockerfile content
        """
        try:
            template = self.template_env.get_template(f"{template}.dockerfile")
            content = template.render(**variables)
            
            if output_path:
                with open(output_path, "w") as f:
                    f.write(content)
                    
            return content
            
        except Exception as e:
            logger.error(f"Failed to generate Dockerfile: {str(e)}")
            raise MCPDevServerError(f"Dockerfile generation failed: {str(e)}")
            
    async def create_compose_config(
        self,
        name: str,
        services: Dict[str, Any],
        output_path: Optional[str] = None
    ) -> str:
        """Create Docker Compose configuration.
        
        Args:
            name: Project name
            services: Service configurations
            output_path: Optional path to save docker-compose.yml
            
        Returns:
            str: Generated docker-compose.yml content
        """
        try:
            compose_config = {
                "version": "3.8",
                "services": services,
                "networks": {
                    "mcp-network": {
                        "driver": "bridge"
                    }
                }
            }
            
            content = yaml.dump(compose_config, default_flow_style=False)
            
            if output_path:
                with open(output_path, "w") as f:
                    f.write(content)
                    
            return content
            
        except Exception as e:
            logger.error(f"Failed to create Docker Compose config: {str(e)}")
            raise MCPDevServerError(f"Compose config creation failed: {str(e)}")
            
    async def execute_command(
        self,
        env_id: str,
        command: str,
        workdir: Optional[str] = None,
        stream: bool = False
    ) -> Dict[str, Any]:
        """Execute command in Docker environment.
        
        Args:
            env_id: Environment ID
            command: Command to execute
            workdir: Working directory
            stream: Stream output in real-time
            
        Returns:
            Dict[str, Any]: Command execution results
        """
        try:
            if env_id not in self.active_containers:
                raise MCPDevServerError(f"Environment not found: {env_id}")
                
            container = self.active_containers[env_id]["container"]
            exec_result = container.exec_run(
                command,
                workdir=workdir or "/workspace",
                stream=True
            )
            
            if stream:
                output = []
                for line in exec_result.output:
                    decoded_line = line.decode().strip()
                    output.append(decoded_line)
                    yield decoded_line
                    
                return {
                    "exit_code": exec_result.exit_code,
                    "output": output
                }
            else:
                output = []
                for line in exec_result.output:
                    output.append(line.decode().strip())
                    
                return {
                    "exit_code": exec_result.exit_code,
                    "output": output
                }
                
        except Exception as e:
            logger.error(f"Command execution failed: {str(e)}")
            raise MCPDevServerError(f"Command execution failed: {str(e)}")
            
    async def cleanup(self):
        """Clean up Docker resources."""
        try:
            for env_id in list(self.active_containers.keys()):
                await self.destroy_environment(env_id)
                
        except Exception as e:
            logger.error(f"Docker cleanup failed: {str(e)}")
            raise MCPDevServerError(f"Docker cleanup failed: {str(e)}")
            
    def get_logs(self, env_id: str, tail: Optional[int] = None) -> str:
        """Get container logs.
        
        Args:
            env_id: Environment ID
            tail: Number of lines to return from the end
            
        Returns:
            str: Container logs
        """
        try:
            if env_id not in self.active_containers:
                raise MCPDevServerError(f"Environment not found: {env_id}")
                
            container = self.active_containers[env_id]["container"]
            return container.logs(tail=tail).decode()
            
        except Exception as e:
            logger.error(f"Failed to get logs: {str(e)}")
            raise MCPDevServerError(f"Log retrieval failed: {str(e)}")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/project.py:
--------------------------------------------------------------------------------

```python
"""Project representation and management."""
import uuid
from typing import Dict, Any, Optional, List
from pathlib import Path
import git
from pydantic import BaseModel

class ProjectConfig(BaseModel):
    """Project configuration model."""
    
    name: str
    template: str
    description: str = ""
    version: str = "0.1.0"
    
class ProjectState:
    """Project state tracking."""
    
    def __init__(self):
        """Initialize project state."""
        self.git_initialized: bool = False
        self.last_build: Optional[Dict[str, Any]] = None
        self.last_test_run: Optional[Dict[str, Any]] = None
        self.active_environments: List[str] = []
        
class Project:
    """Project instance representation."""
    
    def __init__(self, path: str, config: ProjectConfig, state: ProjectState):
        """Initialize project instance.
        
        Args:
            path: Project directory path
            config: Project configuration
            state: Project state
        """
        self.id = str(uuid.uuid4())
        self.path = path
        self.config = config
        self.state = state
        
    def get_structure(self) -> Dict[str, Any]:
        """Get project directory structure.
        
        Returns:
            Dict[str, Any]: Directory structure
        """
        def scan_dir(path: Path) -> Dict[str, Any]:
            structure = {}
            
            for item in path.iterdir():
                if item.name.startswith("."):
                    continue
                    
                if item.is_file():
                    structure[item.name] = "file"
                elif item.is_dir():
                    structure[item.name] = scan_dir(item)
                    
            return structure
            
        return scan_dir(Path(self.path))
        
    def get_git_status(self) -> Dict[str, Any]:
        """Get Git repository status.
        
        Returns:
            Dict[str, Any]: Git status information
        """
        if not self.state.git_initialized:
            return {"initialized": False}
            
        try:
            repo = git.Repo(self.path)
            return {
                "initialized": True,
                "branch": repo.active_branch.name,
                "changed_files": [item.a_path for item in repo.index.diff(None)],
                "untracked_files": repo.untracked_files,
                "ahead": sum(1 for c in repo.iter_commits("origin/main..main")),
                "behind": sum(1 for c in repo.iter_commits("main..origin/main"))
            }
        except Exception as e:
            return {
                "initialized": False,
                "error": str(e)
            }
            
    async def create_git_commit(self, message: str, files: Optional[List[str]] = None) -> Dict[str, Any]:
        """Create a Git commit.
        
        Args:
            message: Commit message
            files: Optional list of files to commit
            
        Returns:
            Dict[str, Any]: Commit information
        """
        if not self.state.git_initialized:
            raise ValueError("Git is not initialized for this project")
            
        try:
            repo = git.Repo(self.path)
            
            if files:
                repo.index.add(files)
            else:
                repo.index.add("*")
                
            commit = repo.index.commit(message)
            
            return {
                "commit_id": commit.hexsha,
                "message": message,
                "author": str(commit.author),
                "files": [item.a_path for item in commit.stats.files]
            }
        except Exception as e:
            raise ValueError(f"Failed to create commit: {str(e)}")
            
    def get_dependencies(self) -> Dict[str, Any]:
        """Get project dependencies.
        
        Returns:
            Dict[str, Any]: Dependency information
        """
        dependencies = {}
        
        # Check Python dependencies
        req_file = Path(self.path) / "requirements.txt"
        if req_file.exists():
            with open(req_file, "r") as f:
                dependencies["python"] = f.read().splitlines()
                
        # Check Node.js dependencies
        package_file = Path(self.path) / "package.json"
        if package_file.exists():
            import json
            with open(package_file, "r") as f:
                package_data = json.load(f)
                dependencies["node"] = {
                    "dependencies": package_data.get("dependencies", {}),
                    "devDependencies": package_data.get("devDependencies", {})
                }
                
        return dependencies
        
    def analyze_code(self) -> Dict[str, Any]:
        """Analyze project code.
        
        Returns:
            Dict[str, Any]: Code analysis results
        """
        analysis = {
            "files": {},
            "summary": {
                "total_files": 0,
                "total_lines": 0,
                "code_lines": 0,
                "comment_lines": 0,
                "blank_lines": 0
            }
        }
        
        def analyze_file(path: Path) -> Dict[str, Any]:
            with open(path, "r", encoding="utf-8") as f:
                lines = f.readlines()
                
            total_lines = len(lines)
            blank_lines = sum(1 for line in lines if not line.strip())
            comment_lines = sum(1 for line in lines if line.strip().startswith("#"))
            code_lines = total_lines - blank_lines - comment_lines
            
            return {
                "total_lines": total_lines,
                "code_lines": code_lines,
                "comment_lines": comment_lines,
                "blank_lines": blank_lines
            }
            
        for root, _, files in os.walk(self.path):
            for file in files:
                if file.endswith(".py"):
                    file_path = Path(root) / file
                    try:
                        file_analysis = analyze_file(file_path)
                        relative_path = str(file_path.relative_to(self.path))
                        analysis["files"][relative_path] = file_analysis
                        
                        # Update summary
                        for key in ["total_lines", "code_lines", "comment_lines", "blank_lines"]:
                            analysis["summary"][key] += file_analysis[key]
                            
                        analysis["summary"]["total_files"] += 1
                    except Exception:
                        continue
                        
        return analysis
        
    def get_test_coverage(self) -> Dict[str, Any]:
        """Get test coverage information.
        
        Returns:
            Dict[str, Any]: Test coverage data
        """
        try:
            import coverage
            
            cov = coverage.Coverage()
            cov.load()
            
            return {
                "total_coverage": cov.report(),
                "missing_lines": dict(cov.analysis2()),
                "branch_coverage": cov.get_option("branch"),
                "excluded_lines": cov.get_exclude_list()
            }
        except Exception:
            return {
                "error": "Coverage data not available"
            }
            
    def get_ci_config(self) -> Dict[str, Any]:
        """Get CI configuration.
        
        Returns:
            Dict[str, Any]: CI configuration data
        """
        ci_configs = {}
        
        # Check GitHub Actions
        github_dir = Path(self.path) / ".github" / "workflows"
        if github_dir.exists():
            ci_configs["github_actions"] = []
            for workflow in github_dir.glob("*.yml"):
                with open(workflow, "r") as f:
                    ci_configs["github_actions"].append({
                        "name": workflow.stem,
                        "config": f.read()
                    })
                    
        # Check GitLab CI
        gitlab_file = Path(self.path) / ".gitlab-ci.yml"
        if gitlab_file.exists():
            with open(gitlab_file, "r") as f:
                ci_configs["gitlab"] = f.read()
                
        return ci_configs
        
    async def cleanup(self):
        """Clean up project resources."""
        # Implementation will depend on what resources need cleanup
        pass
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/core/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import websockets
from typing import Callable, Any, Dict, Optional
import logging
import traceback

logger = logging.getLogger(__name__)

class Server:
    """Core server class implementing JSON-RPC 2.0 protocol."""
    
    def __init__(self, name: str):
        """Initialize the server.
        
        Args:
            name: Server name
        """
        self.name = name
        self.websocket = None
        self.input_request_handlers = {}
        self.input_response_handlers = {}
        self.initialized = False
        self.capabilities = {}
        
    async def start(self, host: str = "localhost", port: int = 8000):
        """Start the WebSocket server.
        
        Args:
            host: Host to bind to
            port: Port to listen on
        """
        async def handler(websocket, path):
            self.websocket = websocket
            try:
                logger.info(f"New WebSocket connection from {websocket.remote_address}")
                async for message in websocket:
                    response = None
                    try:
                        # Parse JSON-RPC message
                        data = json.loads(message)
                        if not isinstance(data, dict):
                            raise ValueError("Invalid JSON-RPC message")
                            
                        # Handle message
                        response = await self.handle_jsonrpc(data)
                        
                    except json.JSONDecodeError as e:
                        logger.error(f"JSON decode error: {str(e)}")
                        response = {
                            "jsonrpc": "2.0",
                            "error": {
                                "code": -32700,
                                "message": "Parse error",
                                "data": str(e)
                            },
                            "id": None
                        }
                        
                    except Exception as e:
                        logger.error(f"Error handling message: {str(e)}", exc_info=True)
                        response = {
                            "jsonrpc": "2.0",
                            "error": {
                                "code": -32603,
                                "message": "Internal error",
                                "data": {
                                    "error": str(e),
                                    "traceback": traceback.format_exc()
                                }
                            },
                            "id": getattr(data, "id", None) if isinstance(data, dict) else None
                        }

                    # Ensure we always send a properly formatted JSON-RPC response
                    if response:
                        try:
                            if not isinstance(response, dict):
                                response = {"result": response}
                            
                            response["jsonrpc"] = "2.0"
                            if isinstance(data, dict) and "id" in data:
                                response["id"] = data["id"]
                                
                            # Validate JSON before sending
                            response_str = json.dumps(response)
                            await websocket.send(response_str)
                            
                        except Exception as e:
                            logger.error(f"Error sending response: {str(e)}", exc_info=True)
                            error_response = {
                                "jsonrpc": "2.0",
                                "error": {
                                    "code": -32603,
                                    "message": "Error sending response",
                                    "data": str(e)
                                },
                                "id": data.get("id") if isinstance(data, dict) else None
                            }
                            await websocket.send(json.dumps(error_response))
                        
            except websockets.exceptions.ConnectionClosed:
                logger.info("WebSocket connection closed")
            finally:
                self.websocket = None
                
        try:
            self.server = await websockets.serve(
                handler,
                host,
                port,
                ping_interval=20,
                ping_timeout=20
            )
            logger.info(f"Server started on ws://{host}:{port}")
        except Exception as e:
            logger.error(f"Failed to start server: {str(e)}", exc_info=True)
            raise
        
    async def handle_jsonrpc(self, data: Dict) -> Optional[Dict]:
        """Handle JSON-RPC message.
        
        Args:
            data: Parsed JSON-RPC message
            
        Returns:
            Optional response message
        """
        try:
            method = data.get("method")
            params = data.get("params", {})
            
            logger.info(f"Handling method: {method} with params: {params}")
            
            if method == "initialize":
                self.capabilities = params.get("capabilities", {})
                self.initialized = True
                return {
                    "result": {
                        "capabilities": self.capabilities
                    }
                }
                
            if not self.initialized:
                return {
                    "error": {
                        "code": -32002,
                        "message": "Server not initialized"
                    }
                }
                
            if method == "input/request":
                handler = self.input_request_handlers.get("input_request")
                if handler:
                    try:
                        result = await handler(
                            params.get("type", ""),
                            params.get("context", {})
                        )
                        return {"result": result}
                    except Exception as e:
                        logger.error(f"Error in input request handler: {str(e)}", exc_info=True)
                        return {
                            "error": {
                                "code": -32000,
                                "message": str(e),
                                "data": {
                                    "traceback": traceback.format_exc()
                                }
                            }
                        }
                        
            elif method == "input/response":
                handler = self.input_response_handlers.get("input_response")
                if handler:
                    try:
                        await handler(params)
                        return {"result": None}
                    except Exception as e:
                        logger.error(f"Error in input response handler: {str(e)}", exc_info=True)
                        return {
                            "error": {
                                "code": -32000,
                                "message": str(e),
                                "data": {
                                    "traceback": traceback.format_exc()
                                }
                            }
                        }
                        
            return {
                "error": {
                    "code": -32601,
                    "message": f"Method not found: {method}"
                }
            }
            
        except Exception as e:
            logger.error(f"Error in handle_jsonrpc: {str(e)}", exc_info=True)
            return {
                "error": {
                    "code": -32603,
                    "message": "Internal error",
                    "data": {
                        "error": str(e),
                        "traceback": traceback.format_exc()
                    }
                }
            }
                
    def request_input(self) -> Callable:
        """Decorator for input request handlers."""
        def decorator(func: Callable) -> Callable:
            self.input_request_handlers["input_request"] = func
            return func
        return decorator
        
    def handle_input(self) -> Callable:
        """Decorator for input response handlers."""
        def decorator(func: Callable) -> Callable:
            self.input_response_handlers["input_response"] = func
            return func
        return decorator
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/base_project.py:
--------------------------------------------------------------------------------

```python
"""Base project class definition."""
import os
import uuid
import xml.etree.ElementTree as ET
import json
import tomli
from pathlib import Path
from typing import Dict, Any, Optional, List
import git

from .project_types import ProjectType, BuildSystem
from ..utils.errors import ProjectError
from ..utils.logging import setup_logging

logger = setup_logging(__name__)

class Project:
    """Base project class."""
    
    def __init__(self, path: str, config: Dict[str, Any], project_type: ProjectType):
        """Initialize project instance."""
        self.id = str(uuid.uuid4())
        self.path = path
        self.config = config
        self.project_type = project_type
        self.build_system = BuildSystem(config["build_system"])
        
    def get_dependencies(self) -> Dict[str, Any]:
        """Get project dependencies."""
        if self.build_system == BuildSystem.MAVEN:
            return self._get_maven_dependencies()
        elif self.build_system == BuildSystem.GRADLE:
            return self._get_gradle_dependencies()
        elif self.build_system in [BuildSystem.NPM, BuildSystem.YARN]:
            return self._get_node_dependencies()
        elif self.build_system == BuildSystem.POETRY:
            return self._get_poetry_dependencies()
        elif self.build_system == BuildSystem.DOTNET:
            return self._get_dotnet_dependencies()
        elif self.build_system == BuildSystem.GO:
            return self._get_go_dependencies()
        else:
            return {}

    def _get_maven_dependencies(self) -> Dict[str, Any]:
        """Get Maven project dependencies."""
        pom_path = Path(self.path) / "pom.xml"
        if not pom_path.exists():
            return {}

        try:
            tree = ET.parse(pom_path)
            root = tree.getroot()
            ns = {'maven': 'http://maven.apache.org/POM/4.0.0'}
            
            dependencies = []
            for dep in root.findall('.//maven:dependency', ns):
                dependencies.append({
                    'groupId': dep.find('maven:groupId', ns).text,
                    'artifactId': dep.find('maven:artifactId', ns).text,
                    'version': dep.find('maven:version', ns).text if dep.find('maven:version', ns) is not None else None,
                    'scope': dep.find('maven:scope', ns).text if dep.find('maven:scope', ns) is not None else 'compile'
                })
                
            return {'maven': dependencies}
        except Exception as e:
            logger.error(f"Error parsing Maven dependencies: {e}")
            return {}

    def _get_node_dependencies(self) -> Dict[str, Any]:
        """Get Node.js project dependencies."""
        package_path = Path(self.path) / "package.json"
        if not package_path.exists():
            return {}

        try:
            with open(package_path) as f:
                package_data = json.load(f)
                return {
                    'dependencies': package_data.get('dependencies', {}),
                    'devDependencies': package_data.get('devDependencies', {})
                }
        except Exception as e:
            logger.error(f"Error parsing Node.js dependencies: {e}")
            return {}

    def _get_poetry_dependencies(self) -> Dict[str, Any]:
        """Get Poetry project dependencies."""
        pyproject_path = Path(self.path) / "pyproject.toml"
        if not pyproject_path.exists():
            return {}

        try:
            with open(pyproject_path, "rb") as f:
                pyproject_data = tomli.load(f)
                tool_poetry = pyproject_data.get('tool', {}).get('poetry', {})
                return {
                    'dependencies': tool_poetry.get('dependencies', {}),
                    'dev-dependencies': tool_poetry.get('dev-dependencies', {})
                }
        except Exception as e:
            logger.error(f"Error parsing Poetry dependencies: {e}")
            return {}

    def _get_dotnet_dependencies(self) -> Dict[str, Any]:
        """Get .NET project dependencies."""
        try:
            # Find all .csproj files
            csproj_files = list(Path(self.path).glob("**/*.csproj"))
            dependencies = {}
            
            for csproj in csproj_files:
                tree = ET.parse(csproj)
                root = tree.getroot()
                project_deps = []
                
                for item_group in root.findall('.//PackageReference'):
                    project_deps.append({
                        'Include': item_group.get('Include'),
                        'Version': item_group.get('Version')
                    })
                    
                dependencies[csproj.stem] = project_deps
                
            return dependencies
        except Exception as e:
            logger.error(f"Error parsing .NET dependencies: {e}")
            return {}

    def _get_go_dependencies(self) -> Dict[str, Any]:
        """Get Go project dependencies."""
        go_mod_path = Path(self.path) / "go.mod"
        if not go_mod_path.exists():
            return {}

        try:
            result = subprocess.run(
                ['go', 'list', '-m', 'all'],
                capture_output=True,
                text=True,
                cwd=self.path
            )
            if result.returncode == 0:
                dependencies = []
                for line in result.stdout.splitlines()[1:]:  # Skip first line (module name)
                    parts = line.split()
                    if len(parts) >= 2:
                        dependencies.append({
                            'module': parts[0],
                            'version': parts[1]
                        })
                return {'modules': dependencies}
        except Exception as e:
            logger.error(f"Error parsing Go dependencies: {e}")
            return {}

    async def update_dependencies(self, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Update project dependencies."""
        if self.build_system == BuildSystem.MAVEN:
            cmd = "mvn versions:use-latest-versions"
        elif self.build_system == BuildSystem.GRADLE:
            cmd = "./gradlew dependencyUpdates"
        elif self.build_system == BuildSystem.NPM:
            cmd = "npm update"
        elif self.build_system == BuildSystem.YARN:
            cmd = "yarn upgrade"
        elif self.build_system == BuildSystem.POETRY:
            cmd = "poetry update"
        elif self.build_system == BuildSystem.DOTNET:
            cmd = "dotnet restore"
        else:
            raise ProjectError(f"Dependency updates not supported for {self.build_system}")
            
        return await self.execute_command(cmd)

    async def get_project_analysis(self) -> Dict[str, Any]:
        """Get project analysis results."""
        analysis = {
            "structure": self.get_structure(),
            "dependencies": self.get_dependencies(),
            "metadata": {
                "name": self.config["name"],
                "type": self.project_type.name,
                "build_system": self.build_system.value,
                "config": self.config
            }
        }

        # Add Git information if available
        git_info = self.get_git_status()
        if git_info.get("initialized", False):
            analysis["git"] = git_info

        # Add build/test status if available
        if hasattr(self, 'last_build'):
            analysis["last_build"] = self.last_build
        if hasattr(self, 'last_test_run'):
            analysis["last_test_run"] = self.last_test_run

        return analysis

    def get_structure(self) -> Dict[str, Any]:
        """Get project structure."""
        def scan_dir(path: Path) -> Dict[str, Any]:
            structure = {}
            ignore_patterns = ['.git', '__pycache__', 'node_modules', 'target', 'build']
            
            for item in path.iterdir():
                if item.name in ignore_patterns:
                    continue
                    
                if item.is_file():
                    structure[item.name] = {
                        "type": "file",
                        "size": item.stat().st_size
                    }
                elif item.is_dir():
                    structure[item.name] = {
                        "type": "directory",
                        "contents": scan_dir(item)
                    }
                    
            return structure
            
        return scan_dir(Path(self.path))

    async def cleanup(self):
        """Clean up project resources."""
        try:
            # Clean build artifacts
            if self.build_system == BuildSystem.MAVEN:
                await self.execute_command("mvn clean")
            elif self.build_system == BuildSystem.GRADLE:
                await self.execute_command("./gradlew clean")
            elif self.build_system == BuildSystem.NPM:
                await self.execute_command("npm run clean")

            logger.info(f"Cleaned up project: {self.config['name']}")
        except Exception as e:
            logger.error(f"Project cleanup failed: {e}")
            raise ProjectError(f"Cleanup failed: {str(e)}")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/environments/workflow.py:
--------------------------------------------------------------------------------

```python
"""Development workflow management for environments."""
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import asyncio

from ..utils.logging import setup_logging
from ..utils.errors import WorkflowError

logger = setup_logging(__name__)

class TaskStatus(str, Enum):
    """Workflow task status."""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

class Task:
    """Represents a workflow task."""
    
    def __init__(
        self,
        name: str,
        command: str,
        environment: str,
        dependencies: Optional[List[str]] = None,
        timeout: Optional[int] = None,
        retry_count: int = 0,
        on_success: Optional[Callable] = None,
        on_failure: Optional[Callable] = None
    ):
        self.name = name
        self.command = command
        self.environment = environment
        self.dependencies = dependencies or []
        self.timeout = timeout
        self.retry_count = retry_count
        self.status = TaskStatus.PENDING
        self.result: Optional[Dict[str, Any]] = None
        self.on_success = on_success
        self.on_failure = on_failure
        self.attempts = 0

class Workflow:
    """Manages development workflows."""
    
    def __init__(self, env_manager):
        self.env_manager = env_manager
        self.tasks: Dict[str, Task] = {}
        self.running = False
        
    def add_task(self, task: Task) -> None:
        """Add a task to the workflow."""
        self.tasks[task.name] = task
        
    def remove_task(self, task_name: str) -> None:
        """Remove a task from the workflow."""
        if task_name in self.tasks:
            del self.tasks[task_name]
            
    async def execute(self) -> Dict[str, Any]:
        """Execute the workflow."""
        try:
            self.running = True
            results = {}
            
            # Build dependency graph
            graph = self._build_dependency_graph()
            
            # Execute tasks in order
            for task_group in graph:
                # Execute tasks in group concurrently
                tasks = [self._execute_task(task_name) for task_name in task_group]
                group_results = await asyncio.gather(*tasks, return_exceptions=True)
                
                # Process results
                for task_name, result in zip(task_group, group_results):
                    if isinstance(result, Exception):
                        self.tasks[task_name].status = TaskStatus.FAILED
                        results[task_name] = {
                            "status": TaskStatus.FAILED,
                            "error": str(result)
                        }
                    else:
                        results[task_name] = result
                
            return results
            
        except Exception as e:
            raise WorkflowError(f"Workflow execution failed: {str(e)}")
        finally:
            self.running = False
            
    async def _execute_task(self, task_name: str) -> Dict[str, Any]:
        """Execute a single task."""
        task = self.tasks[task_name]
        
        # Check dependencies
        for dep in task.dependencies:
            dep_task = self.tasks.get(dep)
            if not dep_task or dep_task.status != TaskStatus.COMPLETED:
                task.status = TaskStatus.SKIPPED
                return {
                    "status": TaskStatus.SKIPPED,
                    "reason": f"Dependency {dep} not satisfied"
                }
        
        task.status = TaskStatus.RUNNING
        task.attempts += 1
        
        try:
            # Execute the command
            result = await asyncio.wait_for(
                self.env_manager.execute_in_environment(
                    task.environment,
                    task.command
                ),
                timeout=task.timeout
            )
            
            # Handle execution result
            if result['exit_code'] == 0:
                task.status = TaskStatus.COMPLETED
                if task.on_success:
                    await task.on_success(result)
                return {
                    "status": TaskStatus.COMPLETED,
                    "result": result
                }
            else:
                # Handle retry logic
                if task.attempts < task.retry_count + 1:
                    logger.info(f"Retrying task {task_name} (attempt {task.attempts})")
                    return await self._execute_task(task_name)
                
                task.status = TaskStatus.FAILED
                if task.on_failure:
                    await task.on_failure(result)
                return {
                    "status": TaskStatus.FAILED,
                    "result": result
                }
                
        except asyncio.TimeoutError:
            task.status = TaskStatus.FAILED
            return {
                "status": TaskStatus.FAILED,
                "error": "Task timeout"
            }
            
        except Exception as e:
            task.status = TaskStatus.FAILED
            return {
                "status": TaskStatus.FAILED,
                "error": str(e)
            }
            
    def _build_dependency_graph(self) -> List[List[str]]:
        """Build ordered list of task groups based on dependencies."""
        # Initialize variables
        graph: List[List[str]] = []
        completed = set()
        remaining = set(self.tasks.keys())
        
        while remaining:
            # Find tasks with satisfied dependencies
            group = set()
            for task_name in remaining:
                task = self.tasks[task_name]
                if all(dep in completed for dep in task.dependencies):
                    group.add(task_name)
            
            if not group:
                # Circular dependency detected
                raise WorkflowError("Circular dependency detected in workflow")
            
            # Add group to graph
            graph.append(list(group))
            completed.update(group)
            remaining.difference_update(group)
            
        return graph
        
    def get_status(self) -> Dict[str, Any]:
        """Get workflow status."""
        return {
            "running": self.running,
            "tasks": {
                name: {
                    "status": task.status,
                    "attempts": task.attempts,
                    "dependencies": task.dependencies
                }
                for name, task in self.tasks.items()
            }
        }
        
    def reset(self) -> None:
        """Reset workflow state."""
        for task in self.tasks.values():
            task.status = TaskStatus.PENDING
            task.attempts = 0
            task.result = None
        self.running = False

# Example workflow definitions for common development tasks
class CommonWorkflows:
    """Predefined development workflows."""
    
    @staticmethod
    def create_build_workflow(env_manager, environment: str) -> Workflow:
        """Create a standard build workflow."""
        workflow = Workflow(env_manager)
        
        # Install dependencies
        workflow.add_task(Task(
            name="install_deps",
            command="npm install",
            environment=environment,
            retry_count=2
        ))
        
        # Run linter
        workflow.add_task(Task(
            name="lint",
            command="npm run lint",
            environment=environment,
            dependencies=["install_deps"]
        ))
        
        # Run tests
        workflow.add_task(Task(
            name="test",
            command="npm run test",
            environment=environment,
            dependencies=["install_deps"]
        ))
        
        # Build
        workflow.add_task(Task(
            name="build",
            command="npm run build",
            environment=environment,
            dependencies=["lint", "test"]
        ))
        
        return workflow
        
    @staticmethod
    def create_test_workflow(env_manager, environment: str) -> Workflow:
        """Create a standard test workflow."""
        workflow = Workflow(env_manager)
        
        # Install test dependencies
        workflow.add_task(Task(
            name="install_test_deps",
            command="npm install --only=dev",
            environment=environment,
            retry_count=2
        ))
        
        # Run unit tests
        workflow.add_task(Task(
            name="unit_tests",
            command="npm run test:unit",
            environment=environment,
            dependencies=["install_test_deps"]
        ))
        
        # Run integration tests
        workflow.add_task(Task(
            name="integration_tests",
            command="npm run test:integration",
            environment=environment,
            dependencies=["install_test_deps"]
        ))
        
        # Generate coverage report
        workflow.add_task(Task(
            name="coverage",
            command="npm run coverage",
            environment=environment,
            dependencies=["unit_tests", "integration_tests"]
        ))
        
        return workflow
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/environments/tools.py:
--------------------------------------------------------------------------------

```python
"""Development tools integration for environments."""
import shutil
import subprocess
from typing import Dict, Optional, Any
from pathlib import Path

from ..utils.logging import setup_logging
from ..utils.errors import ToolError

logger = setup_logging(__name__)

class ToolManager:
    """Manages development tools in environments."""
    
    def __init__(self, env_manager):
        self.env_manager = env_manager
        
    async def setup_package_manager(
        self,
        environment: str,
        package_manager: str,
        config: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Set up package manager in an environment."""
        try:
            config = config or {}
            
            if package_manager == "npm":
                return await self._setup_npm(environment, config)
            elif package_manager == "pip":
                return await self._setup_pip(environment, config)
            else:
                raise ToolError(f"Unsupported package manager: {package_manager}")
                
        except Exception as e:
            raise ToolError(f"Failed to setup package manager: {str(e)}")
            
    async def setup_build_tool(
        self,
        environment: str,
        build_tool: str,
        config: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Set up build tool in an environment."""
        try:
            config = config or {}
            
            if build_tool == "webpack":
                return await self._setup_webpack(environment, config)
            elif build_tool == "vite":
                return await self._setup_vite(environment, config)
            else:
                raise ToolError(f"Unsupported build tool: {build_tool}")
                
        except Exception as e:
            raise ToolError(f"Failed to setup build tool: {str(e)}")
            
    async def setup_test_framework(
        self,
        environment: str,
        test_framework: str,
        config: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Set up testing framework in an environment."""
        try:
            config = config or {}
            
            if test_framework == "jest":
                return await self._setup_jest(environment, config)
            elif test_framework == "pytest":
                return await self._setup_pytest(environment, config)
            else:
                raise ToolError(f"Unsupported test framework: {test_framework}")
                
        except Exception as e:
            raise ToolError(f"Failed to setup test framework: {str(e)}")
            
    async def _setup_npm(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up NPM package manager."""
        try:
            # Initialize package.json if needed
            if not config.get('skip_init'):
                result = await self.env_manager.execute_in_environment(
                    environment,
                    'npm init -y'
                )
                if result['exit_code'] != 0:
                    raise ToolError(f"npm init failed: {result['error']}")
            
            # Install dependencies if specified
            if deps := config.get('dependencies'):
                deps_str = ' '.join(deps)
                result = await self.env_manager.execute_in_environment(
                    environment,
                    f'npm install {deps_str}'
                )
                if result['exit_code'] != 0:
                    raise ToolError(f"npm install failed: {result['error']}")
                    
            return {"status": "success"}
            
        except Exception as e:
            raise ToolError(f"NPM setup failed: {str(e)}")
            
    async def _setup_pip(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up Pip package manager."""
        try:
            # Create virtual environment if needed
            if not config.get('skip_venv'):
                result = await self.env_manager.execute_in_environment(
                    environment,
                    'python -m venv .venv'
                )
                if result['exit_code'] != 0:
                    raise ToolError(f"venv creation failed: {result['error']}")
            
            # Install dependencies if specified
            if deps := config.get('dependencies'):
                deps_str = ' '.join(deps)
                result = await self.env_manager.execute_in_environment(
                    environment,
                    f'pip install {deps_str}'
                )
                if result['exit_code'] != 0:
                    raise ToolError(f"pip install failed: {result['error']}")
                    
            return {"status": "success"}
            
        except Exception as e:
            raise ToolError(f"Pip setup failed: {str(e)}")
            
    async def _setup_webpack(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up Webpack build tool."""
        try:
            # Install webpack and dependencies
            result = await self.env_manager.execute_in_environment(
                environment,
                'npm install webpack webpack-cli --save-dev'
            )
            if result['exit_code'] != 0:
                raise ToolError(f"webpack installation failed: {result['error']}")
                
            # Create webpack config if not exists
            config_content = """
            const path = require('path');
            
            module.exports = {
              entry: './src/index.js',
              output: {
                path: path.resolve(__dirname, 'dist'),
                filename: 'bundle.js'
              }
            };
            """
            
            config_path = Path(self.env_manager.environments[environment]['path']) / 'webpack.config.js'
            config_path.write_text(config_content)
            
            return {"status": "success"}
            
        except Exception as e:
            raise ToolError(f"Webpack setup failed: {str(e)}")
            
    async def _setup_vite(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up Vite build tool."""
        try:
            # Install vite
            result = await self.env_manager.execute_in_environment(
                environment,
                'npm install vite --save-dev'
            )
            if result['exit_code'] != 0:
                raise ToolError(f"vite installation failed: {result['error']}")
                
            # Create vite config if not exists
            config_content = """
            export default {
              root: 'src',
              build: {
                outDir: '../dist'
              }
            }
            """
            
            config_path = Path(self.env_manager.environments[environment]['path']) / 'vite.config.js'
            config_path.write_text(config_content)
            
            return {"status": "success"}
            
        except Exception as e:
            raise ToolError(f"Vite setup failed: {str(e)}")
            
    async def _setup_jest(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up Jest testing framework."""
        try:
            # Install jest and dependencies
            result = await self.env_manager.execute_in_environment(
                environment,
                'npm install jest @types/jest --save-dev'
            )
            if result['exit_code'] != 0:
                raise ToolError(f"jest installation failed: {result['error']}")
                
            # Create jest config if not exists
            config_content = """
            module.exports = {
              testEnvironment: 'node',
              testMatch: ['**/*.test.js'],
              collectCoverage: true
            };
            """
            
            config_path = Path(self.env_manager.environments[environment]['path']) / 'jest.config.js'
            config_path.write_text(config_content)
            
            return {"status": "success"}
            
        except Exception as e:
            raise ToolError(f"Jest setup failed: {str(e)}")
            
    async def _setup_pytest(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up Pytest testing framework."""
        try:
            # Install pytest and dependencies
            result = await self.env_manager.execute_in_environment(
                environment,
                'pip install pytest pytest-cov'
            )
            if result['exit_code'] != 0:
                raise ToolError(f"pytest installation failed: {result['error']}")
                
            # Create pytest config if not exists
            config_content = """
            [pytest]
            testpaths = tests
            python_files = test_*.py
            addopts = --cov=src
            """
            
            config_path = Path(self.env_manager.environments[environment]['path']) / 'pytest.ini'
            config_path.write_text(config_content)
            
            return {"status": "success"}
            
        except Exception as e:
            raise ToolError(f"Pytest setup failed: {str(e)}")

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/project_manager/manager.py:
--------------------------------------------------------------------------------

```python
"""Project management system for MCP Development Server."""
import asyncio
import json
from pathlib import Path
from typing import Dict, Any, Optional, List
import git

from .project_types import PROJECT_TYPES, ProjectType, BuildSystem
from .templates import TemplateManager
from ..prompts.project_templates import PROJECT_TEMPLATES
from ..utils.logging import setup_logging
from ..utils.errors import ProjectError
from ..docker.manager import DockerManager

logger = setup_logging(__name__)

class ProjectManager:
    """Manages development projects."""
    
    def __init__(self, config):
        """Initialize project manager.
        
        Args:
            config: Server configuration instance
        """
        self.config = config
        self.template_manager = TemplateManager()
        self.docker_manager = DockerManager()
        self.current_project = None
        self.projects = {}
        
    def get_available_project_types(self) -> Dict[str, Dict[str, Any]]:
        """Get list of available project types.
        
        Returns:
            Dict[str, Dict[str, Any]]: Project type information
        """
        return {
            name: {
                "name": pt.name,
                "description": pt.description,
                "build_systems": [bs.value for bs in pt.build_systems],
                "default_build_system": pt.default_build_system.value
            }
            for name, pt in PROJECT_TYPES.items()
        }
        
    async def create_project(
        self,
        name: str,
        project_type: str,
        project_config: Dict[str, Any],
        path: Optional[str] = None,
        description: str = ""
    ) -> Any:
        """Create a new project.
        
        Args:
            name: Project name
            project_type: Type of project (e.g., java, dotnet, node)
            project_config: Project-specific configuration
            path: Project directory path (optional)
            description: Project description
            
        Returns:
            Project instance
        """
        try:
            if project_type not in PROJECT_TYPES:
                raise ProjectError(f"Unsupported project type: {project_type}")
                
            project_type_info = PROJECT_TYPES[project_type]
            
            # Determine project path
            if not path:
                projects_dir = Path(self.config.get("projectsDir"))
                path = str(projects_dir / name)
                
            project_path = Path(path)
            if project_path.exists():
                raise ProjectError(f"Project path already exists: {path}")
                
            # Create project directory
            project_path.mkdir(parents=True, exist_ok=True)
            
            # Create project configuration
            project_config.update({
                "name": name,
                "type": project_type,
                "description": description,
                "build_system": project_config.get("build_system", 
                    project_type_info.default_build_system.value)
            })
            
            # Save project configuration
            config_path = project_path / "project.json"
            with open(config_path, "w") as f:
                json.dump(project_config, f, indent=2)
                
            # Create project structure
            await self._create_project_structure(project_path, project_type_info)
            
            # Initialize build system
            await self._initialize_build_system(
                project_path, 
                project_type_info, 
                project_config
            )
            
            # Set up Docker environment if requested
            if project_config.get("setup_docker", False):
                await self._setup_docker_environment(
                    project_path,
                    project_type_info,
                    project_config
                )
                
            # Initialize Git repository if requested
            if project_config.get("initialize_git", True):
                repo = git.Repo.init(path)
                repo.index.add("*")
                repo.index.commit("Initial commit")
                
            # Create project instance
            project = await self._create_project_instance(
                path,
                project_config,
                project_type_info
            )
            
            # Store project reference
            self.projects[project.id] = project
            self.current_project = project
            
            logger.info(f"Created {project_type} project: {name} at {path}")
            return project
            
        except Exception as e:
            logger.error(f"Failed to create project: {str(e)}")
            raise ProjectError(f"Project creation failed: {str(e)}")
            
    async def _create_project_structure(
        self,
        project_path: Path,
        project_type: ProjectType
    ):
        """Create project directory structure.
        
        Args:
            project_path: Project directory path
            project_type: Project type information
        """
        def create_directory_structure(base_path: Path, structure: Dict[str, Any]):
            for name, content in structure.items():
                path = base_path / name
                if isinstance(content, dict):
                    path.mkdir(exist_ok=True)
                    create_directory_structure(path, content)
                    
        create_directory_structure(project_path, project_type.file_structure)
        
    async def _initialize_build_system(
        self,
        project_path: Path,
        project_type: ProjectType,
        project_config: Dict[str, Any]
    ):
        """Initialize project build system.
        
        Args:
            project_path: Project directory path
            project_type: Project type information
            project_config: Project configuration
        """
        build_system = BuildSystem(project_config["build_system"])
        
        # Generate build system configuration files
        if build_system == BuildSystem.MAVEN:
            await self.template_manager.generate_maven_pom(
                project_path, project_config
            )
        elif build_system == BuildSystem.GRADLE:
            await self.template_manager.generate_gradle_build(
                project_path, project_config
            )
        elif build_system == BuildSystem.DOTNET:
            await self.template_manager.generate_dotnet_project(
                project_path, project_config
            )
        elif build_system in [BuildSystem.NPM, BuildSystem.YARN]:
            await self.template_manager.generate_package_json(
                project_path, project_config
            )
        elif build_system == BuildSystem.POETRY:
            await self.template_manager.generate_pyproject_toml(
                project_path, project_config
            )
            
    async def _setup_docker_environment(
        self,
        project_path: Path,
        project_type: ProjectType,
        project_config: Dict[str, Any]
    ):
        """Set up Docker environment for the project.
        
        Args:
            project_path: Project directory path
            project_type: Project type information
            project_config: Project configuration
        """
        # Generate Dockerfile from template
        dockerfile_template = project_type.docker_templates[0]  # Use first template
        dockerfile_content = await self.docker_manager.generate_dockerfile(
            dockerfile_template,
            project_config
        )
        
        dockerfile_path = project_path / "Dockerfile"
        with open(dockerfile_path, "w") as f:
            f.write(dockerfile_content)
            
        # Generate docker-compose.yml if needed
        if project_config.get("use_docker_compose", False):
            services = {
                "app": {
                    "build": ".",
                    "volumes": [
                        "./:/workspace"
                    ],
                    "environment": project_type.environment_variables
                }
            }
            
            compose_content = await self.docker_manager.create_compose_config(
                project_config["name"],
                services,
                project_path / "docker-compose.yml"
            )
            
    async def _create_project_instance(
        self,
        path: str,
        config: Dict[str, Any],
        project_type: ProjectType
    ) -> Any:
        """Create project instance based on type.
        
        Args:
            path: Project directory path
            config: Project configuration
            project_type: Project type information
            
        Returns:
            Project instance
        """
        # Import appropriate project class based on type
        if project_type.name == "java":
            from .java_project import JavaProject
            return JavaProject(path, config, project_type)
        elif project_type.name == "dotnet":
            from .dotnet_project import DotNetProject
            return DotNetProject(path, config, project_type)
        elif project_type.name == "node":
            from .node_project import NodeProject
            return NodeProject(path, config, project_type)
        elif project_type.name == "python":
            from .python_project import PythonProject
            return PythonProject(path, config, project_type)
        elif project_type.name == "golang":
            from .golang_project import GolangProject
            return GolangProject(path, config, project_type)
        else:
            from .base_project import Project
            return Project(path, config, project_type)

```

--------------------------------------------------------------------------------
/src/mcp_dev_server/workflow/manager.py:
--------------------------------------------------------------------------------

```python
"""Development workflow management for MCP Development Server."""

from typing import Dict, List, Optional, Any
from enum import Enum
from datetime import datetime
import asyncio

from ..utils.errors import WorkflowError
from ..utils.logging import setup_logging

logger = setup_logging(__name__)

class WorkflowStatus(str, Enum):
    """Workflow execution status."""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class WorkflowStep:
    """Individual step in a workflow."""
    
    def __init__(
        self,
        name: str,
        command: str,
        environment: str,
        depends_on: Optional[List[str]] = None,
        timeout: Optional[int] = None,
        retry_count: int = 0
    ):
        self.name = name
        self.command = command
        self.environment = environment
        self.depends_on = depends_on or []
        self.timeout = timeout
        self.retry_count = retry_count
        self.status = WorkflowStatus.PENDING
        self.result: Optional[Dict[str, Any]] = None
        self.attempts = 0

class WorkflowManager:
    """Manages development workflows."""
    
    def __init__(self, env_manager):
        self.env_manager = env_manager
        self.workflows: Dict[str, Dict[str, Any]] = {}
        
    async def create_workflow(
        self,
        steps: List[WorkflowStep],
        config: Optional[Dict[str, Any]] = None
    ) -> str:
        """Create a new workflow."""
        try:
            workflow_id = f"workflow_{len(self.workflows)}"
            
            # Initialize workflow
            self.workflows[workflow_id] = {
                "steps": steps,
                "config": config or {},
                "status": WorkflowStatus.PENDING,
                "start_time": None,
                "end_time": None
            }
            
            return workflow_id
            
        except Exception as e:
            raise WorkflowError(f"Failed to create workflow: {str(e)}")
            
    async def start_workflow(self, workflow_id: str) -> None:
        """Start workflow execution."""
        try:
            if workflow := self.workflows.get(workflow_id):
                workflow["status"] = WorkflowStatus.RUNNING
                workflow["start_time"] = datetime.now()
                
                # Execute workflow steps
                asyncio.create_task(self._execute_workflow(workflow_id))
                
            else:
                raise WorkflowError(f"Workflow not found: {workflow_id}")
                
        except Exception as e:
            raise WorkflowError(f"Failed to start workflow: {str(e)}")
            
    async def _execute_workflow(self, workflow_id: str) -> None:
        """Execute workflow steps in order."""
        workflow = self.workflows[workflow_id]
        
        try:
            # Build execution graph
            graph = self._build_execution_graph(workflow["steps"])
            
            # Execute steps in dependency order
            for step_group in graph:
                results = await asyncio.gather(
                    *[self._execute_step(workflow_id, step) for step in step_group],
                    return_exceptions=True
                )
                
                # Check for failures
                if any(isinstance(r, Exception) for r in results):
                    workflow["status"] = WorkflowStatus.FAILED
                    return
                    
            workflow["status"] = WorkflowStatus.COMPLETED
            
        except Exception as e:
            logger.error(f"Workflow execution error: {str(e)}")
            workflow["status"] = WorkflowStatus.FAILED
            workflow["error"] = str(e)
            
        finally:
            workflow["end_time"] = datetime.now()
            
    async def _execute_step(
        self,
        workflow_id: str,
        step: WorkflowStep
    ) -> None:
        """Execute a single workflow step."""
        try:
            step.status = WorkflowStatus.RUNNING
            step.attempts += 1
            
            # Execute step command
            result = await asyncio.wait_for(
                self.env_manager.execute_in_environment(
                    step.environment,
                    step.command
                ),
                timeout=step.timeout
                )
            
            # Handle step result
            success = result["exit_code"] == 0
            step.result = {
                "output": result["output"],
                "error": result.get("error"),
                "exit_code": result["exit_code"]
            }
            
            if success:
                step.status = WorkflowStatus.COMPLETED
            else:
                # Handle retry logic
                if step.attempts < step.retry_count + 1:
                    logger.info(f"Retrying step {step.name} (attempt {step.attempts})")
                    return await self._execute_step(workflow_id, step)
                step.status = WorkflowStatus.FAILED
                
        except asyncio.TimeoutError:
            step.status = WorkflowStatus.FAILED
            step.result = {
                "error": "Step execution timed out"
            }
        except Exception as e:
            step.status = WorkflowStatus.FAILED
            step.result = {
                "error": str(e)
            }
            
    def _build_execution_graph(
        self,
        steps: List[WorkflowStep]
    ) -> List[List[WorkflowStep]]:
        """Build ordered list of step groups based on dependencies."""
        # Initialize variables
        graph: List[List[WorkflowStep]] = []
        completed = set()
        remaining = set(step.name for step in steps)
        steps_by_name = {step.name: step for step in steps}
        
        while remaining:
            # Find steps with satisfied dependencies
            group = set()
            for step_name in remaining:
                step = steps_by_name[step_name]
                if all(dep in completed for dep in step.depends_on):
                    group.add(step_name)
            
            if not group:
                # Circular dependency detected
                raise WorkflowError("Circular dependency detected in workflow steps")
            
            # Add group to graph
            graph.append([steps_by_name[name] for name in group])
            completed.update(group)
            remaining.difference_update(group)
            
        return graph
        
    async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
        """Get status and results of a workflow."""
        if workflow := self.workflows.get(workflow_id):
            return {
                "id": workflow_id,
                "status": workflow["status"],
                "steps": [
                    {
                        "name": step.name,
                        "status": step.status,
                        "result": step.result,
                        "attempts": step.attempts
                    }
                    for step in workflow["steps"]
                ],
                "start_time": workflow["start_time"],
                "end_time": workflow["end_time"],
                "error": workflow.get("error")
            }
        raise WorkflowError(f"Workflow not found: {workflow_id}")

    def get_common_workflows(self) -> Dict[str, List[WorkflowStep]]:
        """Get predefined common workflow templates."""
        return {
            "build": [
                WorkflowStep(
                    name="install",
                    command="npm install",
                    environment="default"
                ),
                WorkflowStep(
                    name="lint",
                    command="npm run lint",
                    environment="default",
                    depends_on=["install"]
                ),
                WorkflowStep(
                    name="test",
                    command="npm test",
                    environment="default", 
                    depends_on=["install"]
                ),
                WorkflowStep(
                    name="build",
                    command="npm run build",
                    environment="default",
                    depends_on=["lint", "test"]
                )
            ],
            "test": [
                WorkflowStep(
                    name="install_deps",
                    command="npm install",
                    environment="default"
                ),
                WorkflowStep(
                    name="unit_tests",
                    command="npm run test:unit",
                    environment="default",
                    depends_on=["install_deps"]
                ),
                WorkflowStep(
                    name="integration_tests", 
                    command="npm run test:integration",
                    environment="default",
                    depends_on=["install_deps"]
                ),
                WorkflowStep(
                    name="coverage",
                    command="npm run coverage",
                    environment="default",
                    depends_on=["unit_tests", "integration_tests"]
                )
            ],
            "release": [
                WorkflowStep(
                    name="bump_version",
                    command="npm version patch",
                    environment="default"
                ),
                WorkflowStep(
                    name="build",
                    command="npm run build",
                    environment="default",
                    depends_on=["bump_version"]
                ),
                WorkflowStep(
                    name="publish",
                    command="npm publish",
                    environment="default",
                    depends_on=["build"]
                )
            ]
        }
```

--------------------------------------------------------------------------------
/src/mcp_dev_server/docker/streams.py:
--------------------------------------------------------------------------------

```python
"""Container output streaming and file synchronization."""
import os
import time
import asyncio
import hashlib
import collections
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional, AsyncGenerator, Any
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from ..utils.logging import setup_logging
from ..utils.errors import StreamError, SyncError

logger = setup_logging(__name__)

class OutputFormat(str, Enum):
    """Output stream formats."""
    STDOUT = "stdout"
    STDERR = "stderr"
    COMBINED = "combined"
    FORMATTED = "formatted"

class StreamConfig:
    """Stream configuration."""
    def __init__(
        self,
        format: OutputFormat = OutputFormat.COMBINED,
        buffer_size: int = 1024,
        filters: Optional[List[str]] = None,
        timestamp: bool = False
    ):
        self.format = format
        self.buffer_size = buffer_size
        self.filters = filters or []
        self.timestamp = timestamp

class SyncConfig:
    """Synchronization configuration."""
    def __init__(
        self,
        ignore_patterns: Optional[List[str]] = None,
        sync_interval: float = 1.0,
        atomic: bool = True
    ):
        self.ignore_patterns = ignore_patterns or []
        self.sync_interval = sync_interval
        self.atomic = atomic

class StreamInfo:
    """Information about an active stream."""
    def __init__(self, task: asyncio.Task, config: StreamConfig):
        self.task = task
        self.config = config
        self.start_time = datetime.now()

class EnhancedOutputStreamManager:
    """Enhanced streaming output manager."""
    
    def __init__(self, docker_manager):
        self.docker_manager = docker_manager
        self.active_streams: Dict[str, StreamInfo] = {}
        self._buffer = collections.deque(maxlen=1000)  # Keep last 1000 messages
        
    async def start_stream(
        self,
        container_name: str,
        command: str,
        config: StreamConfig,
        callback: Optional[callable] = None
    ) -> AsyncGenerator[str, None]:
        """Start enhanced output stream."""
        try:
            container = self.docker_manager.containers.get(container_name)
            if not container:
                raise StreamError(f"Container not found: {container_name}")

            # Create execution with specified format
            exec_result = container.exec_run(
                command,
                stream=True,
                demux=True,
                socket=True  # Use socket for better streaming
            )

            async def stream_handler():
                buffer = []
                try:
                    async for data in exec_result.output:
                        # Apply format and filtering
                        processed_data = self._process_stream_data(data, config)
                        
                        if processed_data:
                            buffer.extend(processed_data)
                            if len(buffer) >= config.buffer_size:
                                output = ''.join(buffer)
                                buffer.clear()
                                
                                self._buffer.append(output)
                                
                                if callback:
                                    await callback(output)
                                yield output
                except Exception as e:
                    logger.error(f"Stream processing error: {str(e)}")
                    raise StreamError(f"Stream processing error: {str(e)}")
                finally:
                    if buffer:
                        output = ''.join(buffer)
                        self._buffer.append(output)
                        if callback:
                            await callback(output)
                        yield output

                    if container_name in self.active_streams:
                        del self.active_streams[container_name]

            # Create and store stream task
            stream_task = asyncio.create_task(stream_handler())
            self.active_streams[container_name] = StreamInfo(stream_task, config)
            
            async for output in stream_task:
                yield output

        except Exception as e:
            logger.error(f"Failed to start stream: {str(e)}")
            raise StreamError(f"Failed to start stream: {str(e)}")

    def _process_stream_data(
        self,
        data: bytes,
        config: StreamConfig
    ) -> Optional[str]:
        """Process stream data according to config."""
        if not data:
            return None
            
        # Split streams if demuxed
        stdout, stderr = data if isinstance(data, tuple) else (data, None)
        
        # Apply format
        if config.format == OutputFormat.STDOUT and stdout:
            output = stdout.decode()
        elif config.format == OutputFormat.STDERR and stderr:
            output = stderr.decode()
        elif config.format == OutputFormat.COMBINED:
            output = ''
            if stdout:
                output += stdout.decode()
            if stderr:
                output += stderr.decode()
        elif config.format == OutputFormat.FORMATTED:
            output = self._format_output(stdout, stderr)
        else:
            return None
            
        # Apply filters
        for filter_pattern in config.filters:
            if filter_pattern in output:
                return None
                
        # Add timestamp if requested
        if config.timestamp:
            output = f"[{datetime.now().isoformat()}] {output}"
            
        return output
        
    @staticmethod
    def _format_output(stdout: Optional[bytes], stderr: Optional[bytes]) -> str:
        """Format output with colors and prefixes."""
        output = []
        
        if stdout:
            output.append(f"\033[32m[OUT]\033[0m {stdout.decode()}")
        if stderr:
            output.append(f"\033[31m[ERR]\033[0m {stderr.decode()}")
            
        return '\n'.join(output)

    async def stop_stream(self, container_name: str) -> None:
        """Stop streaming from a container."""
        if stream_info := self.active_streams.get(container_name):
            stream_info.task.cancel()
            try:
                await stream_info.task
            except asyncio.CancelledError:
                pass
            del self.active_streams[container_name]

class BiDirectionalSync:
    """Enhanced bi-directional file synchronization."""
    
    def __init__(self, docker_manager):
        self.docker_manager = docker_manager
        self.sync_handlers: Dict[str, EnhancedSyncHandler] = {}
        self.observer = Observer()
        self.observer.start()
        
    async def start_sync(
        self,
        container_name: str,
        host_path: str,
        container_path: str,
        config: SyncConfig
    ) -> None:
        """Start bi-directional file sync."""
        try:
            # Validate paths
            if not os.path.exists(host_path):
                raise SyncError(f"Host path does not exist: {host_path}")
            
            container = self.docker_manager.containers.get(container_name)
            if not container:
                raise SyncError(f"Container not found: {container_name}")
            
            # Create sync handler
            handler = EnhancedSyncHandler(
                container=container,
                container_path=container_path,
                host_path=host_path,
                config=config
            )
            
            # Start watching both directions
            self.observer.schedule(
                handler,
                host_path,
                recursive=True
            )
            
            # Start container file watcher
            await handler.start_container_watcher()
            
            self.sync_handlers[container_name] = handler
            logger.info(f"Started bi-directional sync for container: {container_name}")
            
        except Exception as e:
            raise SyncError(f"Failed to start sync: {str(e)}")

    async def stop_sync(self, container_name: str) -> None:
        """Stop synchronization for a container."""
        if handler := self.sync_handlers.get(container_name):
            self.observer.unschedule_all()
            await handler.stop_container_watcher()
            del self.sync_handlers[container_name]
            logger.info(f"Stopped sync for container: {container_name}")

    async def cleanup(self) -> None:
        """Clean up all synchronization handlers."""
        for container_name in list(self.sync_handlers.keys()):
            await self.stop_sync(container_name)
        self.observer.stop()
        self.observer.join()

class EnhancedSyncHandler(FileSystemEventHandler):
    """Enhanced sync handler with bi-directional support."""
    
    def __init__(
        self,
        container,
        container_path: str,
        host_path: str,
        config: SyncConfig
    ):
        super().__init__()
        self.container = container
        self.container_path = container_path
        self.host_path = host_path
        self.config = config
        self.sync_lock = asyncio.Lock()
        self.pending_syncs: Dict[str, float] = {}
        self._container_watcher: Optional[asyncio.Task] = None
        
    async def start_container_watcher(self) -> None:
        """Start watching container files."""
        cmd = f"""
        inotifywait -m -r -e modify,create,delete,move {self.container_path}
        """
        
        exec_result = self.container.exec_run(
            cmd,
            stream=True,
            detach=True
        )
        
        self._container_watcher = asyncio.create_task(
            self._handle_container_events(exec_result.output)
        )
        
    async def stop_container_watcher(self) -> None:
        """Stop container file watcher."""
        if self._container_watcher:
            self._container_watcher.cancel()
            try:
                await self._container_watcher
            except asyncio.CancelledError:
                pass
            self._container_watcher = None
        
    async def _handle_container_events(self, output_stream: AsyncGenerator) -> None:
        """Handle container file events."""
        try:
            async for event in output_stream:
                await self._handle_container_change(event.decode())
        except Exception as e:
            logger.error(f"Container watcher error: {str(e)}")
            
    async def _handle_container_change(self, event: str) -> None:
        """Handle container file change."""
        try:
            # Parse inotify event
            parts = event.strip().split()
            if len(parts) >= 3:
                path = parts[0]
                change_type = parts[1]
                filename = parts[2]
                
                container_path = os.path.join(path, filename)
                host_path = self._container_to_host_path(container_path)
                
                # Apply filters
                if self._should_ignore(host_path):
                    return
                    
                async with self.sync_lock:
                    # Check if change is from host sync
                    if host_path in self.pending_syncs:
                        if time.time() - self.pending_syncs[host_path] < self.config.sync_interval:
                            return
                            
                    # Sync from container to host
                    await self._sync_to_host(container_path, host_path)
                    
        except Exception as e:
            logger.error(f"Error handling container change: {str(e)}")
            
    def _container_to_host_path(self, container_path: str) -> str:
        """Convert container path to host path."""
        rel_path = os.path.relpath(container_path, self.container_path)
        return os.path.join(self.host_path, rel_path)

    def _should_ignore(self, path: str) -> bool:
        """Check if path should be ignored."""
        return any(pattern in path for pattern in self.config.ignore_patterns)
        
    async def _sync_to_host(
        self,
        container_path: str,
        host_path: str
    ) -> None:
        """Sync file from container to host."""
        try:
            # Get file from container
            stream, stat = self.container.get_archive(container_path)
            
            # Create parent directories
            os.makedirs(os.path.dirname(host_path), exist_ok=True)
            
            if self.config.atomic:
                # Save file atomically using temporary file
                tmp_path = f"{host_path}.tmp"
                with open(tmp_path, 'wb') as f:
                    for chunk in stream:
                        f.write(chunk)
                os.rename(tmp_path, host_path)
            else:
                # Direct write
                with open(host_path, 'wb') as f:
                    for chunk in stream:
                        f.write(chunk)
            
            # Update sync tracking
            self.pending_syncs[host_path] = time.time()
            
        except Exception as e:
            logger.error(f"Error syncing to host: {str(e)}")
            raise SyncError(f"Failed to sync file {container_path}: {str(e)}")
```