# Directory Structure ``` ├── .gitignore ├── .python-version ├── LICENSE ├── mcp_python_interpreter │ ├── __init__.py │ ├── main.py │ └── server.py ├── pyproject.toml ├── README.md └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.10 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ test_dir/ *.egg-info .pyirc # Virtual environments .venv ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown [](https://mseep.ai/app/yzfly-mcp-python-interpreter) # MCP Python Interpreter A Model Context Protocol (MCP) server that allows LLMs to interact with Python environments, read and write files, execute Python code, and manage development workflows. ## Features - **Environment Management**: List and use different Python environments (system and conda) - **Code Execution**: Run Python code or scripts in any available environment - **Package Management**: List installed packages and install new ones - **File Operations**: - Read files of any type (text, source code, binary) - Write text and binary files - **Python Prompts**: Templates for common Python tasks like function creation and debugging ## Installation You can install the MCP Python Interpreter using pip: ```bash pip install mcp-python-interpreter ``` Or with uv: ```bash uv install mcp-python-interpreter ``` ## Usage with Claude Desktop 1. Install [Claude Desktop](https://claude.ai/download) 2. Open Claude Desktop, click on menu, then Settings 3. Go to Developer tab and click "Edit Config" 4. Add the following to your `claude_desktop_config.json`: ```json { "mcpServers": { "mcp-python-interpreter": { "command": "uvx", "args": [ "mcp-python-interpreter", "--dir", "/path/to/your/work/dir", "--python-path", "/path/to/your/python" ], "env": { "MCP_ALLOW_SYSTEM_ACCESS": 0 }, } } } ``` For Windows: ```json { "mcpServers": { "python-interpreter": { "command": "uvx", "args": [ "mcp-python-interpreter", "--dir", "C:\\path\\to\\your\\working\\directory", "--python-path", "/path/to/your/python" ], "env": { "MCP_ALLOW_SYSTEM_ACCESS": "0" }, } } } ``` 5. Restart Claude Desktop 6. You should now see the MCP tools icon in the chat interface The `--dir` parameter is **required** and specifies where all files will be saved and executed. This helps maintain security by isolating the MCP server to a specific directory. ### Prerequisites - Make sure you have `uv` installed. If not, install it using: ```bash curl -LsSf https://astral.sh/uv/install.sh | sh ``` - For Windows: ```powershell powershell -ExecutionPolicy Bypass -Command "iwr -useb https://astral.sh/uv/install.ps1 | iex" ``` ## Available Tools The Python Interpreter provides the following tools: ### Environment and Package Management - **list_python_environments**: List all available Python environments (system and conda) - **list_installed_packages**: List packages installed in a specific environment - **install_package**: Install a Python package in a specific environment ### Code Execution - **run_python_code**: Execute Python code in a specific environment - **run_python_file**: Execute a Python file in a specific environment ### File Operations - **read_file**: Read contents of any file type, with size and safety limits - Supports text files with syntax highlighting - Displays hex representation for binary files - **write_file**: Create or overwrite files with text or binary content - **write_python_file**: Create or overwrite a Python file specifically - **list_directory**: List Python files in a directory ## Available Resources - **python://environments**: List all available Python environments - **python://packages/{env_name}**: List installed packages for a specific environment - **python://file/{file_path}**: Get the content of a Python file - **python://directory/{directory_path}**: List all Python files in a directory ## Prompts - **python_function_template**: Generate a template for a Python function - **refactor_python_code**: Help refactor Python code - **debug_python_error**: Help debug a Python error ## Example Usage Here are some examples of what you can ask Claude to do with this MCP server: - "Show me all available Python environments on my system" - "Run this Python code in my conda-base environment: print('Hello, world!')" - "Create a new Python file called 'hello.py' with a function that says hello" - "Read the contents of my 'data.json' file" - "Write a new configuration file with these settings..." - "List all packages installed in my system Python environment" - "Install the requests package in my system Python environment" - "Run data_analysis.py with these arguments: --input=data.csv --output=results.csv" ## File Handling Capabilities The MCP Python Interpreter now supports comprehensive file operations: - Read text and binary files up to 1MB - Write text and binary files - Syntax highlighting for source code files - Hex representation for binary files - Strict file path security (only within the working directory) ## Security Considerations This MCP server has access to your Python environments and file system. Key security features include: - Isolated working directory - File size limits - Prevented writes outside the working directory - Explicit overwrite protection Always be cautious about running code or file operations that you don't fully understand. ## License MIT ``` -------------------------------------------------------------------------------- /mcp_python_interpreter/main.py: -------------------------------------------------------------------------------- ```python """Main module for mcp-python-interpreter.""" from mcp_python_interpreter.server import mcp def main(): """Run the MCP Python Interpreter server.""" mcp.run(transport='stdio') if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /mcp_python_interpreter/__init__.py: -------------------------------------------------------------------------------- ```python """ MCP Python Interpreter A Model Context Protocol server for Python code execution and environment management. """ __version__ = "1.2.3" __author__ = "YZFly" __email__ = "[email protected]" from mcp_python_interpreter.server import mcp __all__ = ["mcp"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["setuptools>=61.0", "wheel"] build-backend = "setuptools.build_meta" [project] name = "mcp-python-interpreter" version = "1.2.3" description = "MCP server for Python code execution and environment management" authors = [ {name = "YZFly", email = "[email protected]"}, ] readme = "README.md" requires-python = ">=3.10" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] dependencies = [ "fastmcp>=2.0.0", ] [project.scripts] mcp-python-interpreter = "mcp_python_interpreter.main:main" [project.urls] "Homepage" = "https://github.com/yzfly/mcp-python-interpreter" "Bug Tracker" = "https://github.com/yzfly/mcp-python-interpreter/issues" [tool.setuptools] packages = ["mcp_python_interpreter"] ``` -------------------------------------------------------------------------------- /mcp_python_interpreter/server.py: -------------------------------------------------------------------------------- ```python """ MCP Python Interpreter A Model Context Protocol server for interacting with Python environments and executing Python code. Supports both in-process execution (default, fast) and subprocess execution (for environment isolation). """ import os import sys import json import subprocess import tempfile import argparse import traceback import builtins from pathlib import Path from io import StringIO from typing import Dict, List, Optional, Any import asyncio import concurrent.futures # Import MCP SDK from mcp.server.fastmcp import FastMCP # Parse command line arguments parser = argparse.ArgumentParser(description='MCP Python Interpreter') parser.add_argument('--dir', type=str, default=os.getcwd(), help='Working directory for code execution and file operations') parser.add_argument('--python-path', type=str, default=None, help='Custom Python interpreter path to use as default') args, unknown = parser.parse_known_args() # Configuration ALLOW_SYSTEM_ACCESS = os.environ.get('MCP_ALLOW_SYSTEM_ACCESS', 'false').lower() in ('true', '1', 'yes') WORKING_DIR = Path(args.dir).absolute() WORKING_DIR.mkdir(parents=True, exist_ok=True) DEFAULT_PYTHON_PATH = args.python_path if args.python_path else sys.executable # Startup message print(f"MCP Python Interpreter starting in directory: {WORKING_DIR}", file=sys.stderr) print(f"Using default Python interpreter: {DEFAULT_PYTHON_PATH}", file=sys.stderr) print(f"System-wide file access: {'ENABLED' if ALLOW_SYSTEM_ACCESS else 'DISABLED'}", file=sys.stderr) print(f"Platform: {sys.platform}", file=sys.stderr) # Create MCP server mcp = FastMCP("python-interpreter") # Thread pool for subprocess fallback _executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) # ============================================================================ # REPL Session Management (for in-process execution) # ============================================================================ class ReplSession: """Manages a Python REPL session with persistent state.""" def __init__(self): self.locals = { "__builtins__": builtins, "__name__": "__main__", "__doc__": None, "__package__": None, } self.history = [] def execute(self, code: str, timeout: Optional[int] = None) -> Dict[str, Any]: """ Execute Python code in this session. Args: code: Python code to execute timeout: Optional timeout (not enforced for inline execution) Returns: Dict with stdout, stderr, result, and status """ stdout_capture = StringIO() stderr_capture = StringIO() # Save original streams old_stdout, old_stderr = sys.stdout, sys.stderr sys.stdout, sys.stderr = stdout_capture, stderr_capture result_value = None status = 0 try: # Change to working directory for execution old_cwd = os.getcwd() os.chdir(WORKING_DIR) try: # Try to evaluate as expression first try: result_value = eval(code, self.locals) if result_value is not None: print(repr(result_value)) except SyntaxError: # If not an expression, execute as statement exec(code, self.locals) except Exception: traceback.print_exc() status = 1 finally: os.chdir(old_cwd) finally: # Restore original streams sys.stdout, sys.stderr = old_stdout, old_stderr return { "stdout": stdout_capture.getvalue(), "stderr": stderr_capture.getvalue(), "result": result_value, "status": status } # Global sessions storage _sessions: Dict[str, ReplSession] = {} def get_session(session_id: str = "default") -> ReplSession: """Get or create a REPL session.""" if session_id not in _sessions: _sessions[session_id] = ReplSession() return _sessions[session_id] # ============================================================================ # Helper functions # ============================================================================ def is_path_allowed(path: Path) -> bool: """Check if a path is allowed based on security settings.""" if ALLOW_SYSTEM_ACCESS: return True try: path.resolve().relative_to(WORKING_DIR.resolve()) return True except ValueError: return False def _run_subprocess_sync( cmd: List[str], cwd: Optional[str] = None, timeout: int = 300 ) -> Dict[str, Any]: """Synchronous subprocess execution for Windows compatibility.""" try: creation_flags = 0 if sys.platform == "win32": creation_flags = subprocess.CREATE_NO_WINDOW try: creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP except AttributeError: pass result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout, creationflags=creation_flags if sys.platform == "win32" else 0, encoding='utf-8', errors='replace', stdin=subprocess.DEVNULL ) return { "stdout": result.stdout, "stderr": result.stderr, "status": result.returncode } except subprocess.TimeoutExpired as e: stdout = e.stdout.decode('utf-8', errors='replace') if e.stdout else "" stderr = e.stderr.decode('utf-8', errors='replace') if e.stderr else "" return { "stdout": stdout, "stderr": stderr + f"\nExecution timed out after {timeout} seconds", "status": -1 } except Exception as e: return { "stdout": "", "stderr": f"Error executing command: {str(e)}", "status": -1 } async def run_subprocess_async( cmd: List[str], cwd: Optional[str] = None, timeout: int = 300, input_data: Optional[str] = None ) -> Dict[str, Any]: """Run subprocess with Windows compatibility.""" # On Windows, use thread pool for reliability if sys.platform == "win32": if input_data: print("Warning: input_data not supported on Windows sync mode", file=sys.stderr) loop = asyncio.get_event_loop() from functools import partial func = partial(_run_subprocess_sync, cmd, cwd, timeout) result = await loop.run_in_executor(_executor, func) return result # On Unix, use asyncio try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE if input_data else asyncio.subprocess.DEVNULL, cwd=cwd ) try: stdout, stderr = await asyncio.wait_for( process.communicate(input=input_data.encode('utf-8') if input_data else None), timeout=timeout ) return { "stdout": stdout.decode('utf-8', errors='replace'), "stderr": stderr.decode('utf-8', errors='replace'), "status": process.returncode } except asyncio.TimeoutError: try: process.kill() await process.wait() except: pass return { "stdout": "", "stderr": f"Execution timed out after {timeout} seconds", "status": -1 } except Exception as e: return { "stdout": "", "stderr": f"Error executing command: {str(e)}", "status": -1 } async def execute_python_code_subprocess( code: str, python_path: Optional[str] = None, working_dir: Optional[str] = None, timeout: int = 300 ) -> Dict[str, Any]: """Execute Python code via subprocess (for environment isolation).""" if python_path is None: python_path = DEFAULT_PYTHON_PATH temp_file = None try: fd, temp_file = tempfile.mkstemp(suffix='.py', text=True) try: with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(code) f.flush() os.fsync(f.fileno()) except Exception as e: os.close(fd) raise e if sys.platform == "win32": await asyncio.sleep(0.05) temp_file = os.path.abspath(temp_file) if working_dir: working_dir = os.path.abspath(working_dir) result = await run_subprocess_async( [python_path, temp_file], cwd=working_dir, timeout=timeout ) return result finally: if temp_file: try: if sys.platform == "win32": await asyncio.sleep(0.05) if os.path.exists(temp_file): os.unlink(temp_file) except Exception as e: print(f"Warning: Could not delete temp file {temp_file}: {e}", file=sys.stderr) def get_python_environments() -> List[Dict[str, str]]: """Get all available Python environments.""" environments = [] if DEFAULT_PYTHON_PATH != sys.executable: try: result = subprocess.run( [DEFAULT_PYTHON_PATH, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}')"], capture_output=True, text=True, check=True, timeout=10, stdin=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) version = result.stdout.strip() environments.append({ "name": "default", "path": DEFAULT_PYTHON_PATH, "version": version }) except Exception as e: print(f"Error getting version for custom Python path: {e}", file=sys.stderr) environments.append({ "name": "system", "path": sys.executable, "version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" }) # Try conda environments try: result = subprocess.run( ["conda", "info", "--envs", "--json"], capture_output=True, text=True, check=False, timeout=10, stdin=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) if result.returncode == 0: conda_info = json.loads(result.stdout) for env in conda_info.get("envs", []): env_name = os.path.basename(env) if env_name == "base": env_name = "conda-base" python_path = os.path.join(env, "bin", "python") if not os.path.exists(python_path): python_path = os.path.join(env, "python.exe") if os.path.exists(python_path): try: version_result = subprocess.run( [python_path, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}')"], capture_output=True, text=True, check=True, timeout=10, stdin=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) version = version_result.stdout.strip() environments.append({ "name": env_name, "path": python_path, "version": version }) except Exception: pass except Exception as e: print(f"Error getting conda environments: {e}", file=sys.stderr) return environments def get_installed_packages(python_path: str) -> List[Dict[str, str]]: """Get installed packages for a specific Python environment.""" try: result = subprocess.run( [python_path, "-m", "pip", "list", "--format=json"], capture_output=True, text=True, check=True, timeout=30, stdin=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) return json.loads(result.stdout) except Exception as e: print(f"Error getting installed packages: {e}", file=sys.stderr) return [] def find_python_files(directory: Path) -> List[Dict[str, str]]: """Find all Python files in a directory.""" files = [] if not directory.exists(): return files for path in directory.rglob("*.py"): if path.is_file(): files.append({ "path": str(path), "name": path.name, "size": path.stat().st_size, "modified": path.stat().st_mtime }) return files # ============================================================================ # Resources # ============================================================================ @mcp.resource("python://environments") def get_environments_resource() -> str: """List all available Python environments as a resource.""" environments = get_python_environments() return json.dumps(environments, indent=2) @mcp.resource("python://packages/{env_name}") def get_packages_resource(env_name: str) -> str: """List installed packages for a specific environment as a resource.""" environments = get_python_environments() env = next((e for e in environments if e["name"] == env_name), None) if not env: return json.dumps({"error": f"Environment '{env_name}' not found"}) packages = get_installed_packages(env["path"]) return json.dumps(packages, indent=2) @mcp.resource("python://directory") def get_working_directory_listing() -> str: """List all Python files in the working directory as a resource.""" try: files = find_python_files(WORKING_DIR) return json.dumps({ "working_directory": str(WORKING_DIR), "files": files }, indent=2) except Exception as e: return json.dumps({"error": f"Error listing directory: {str(e)}"}) @mcp.resource("python://session/{session_id}/history") def get_session_history(session_id: str) -> str: """Get execution history for a REPL session.""" if session_id not in _sessions: return json.dumps({"error": f"Session '{session_id}' not found"}) session = _sessions[session_id] return json.dumps({ "session_id": session_id, "history": session.history }, indent=2) # ============================================================================ # Tools # ============================================================================ @mcp.tool() def list_python_environments() -> str: """List all available Python environments (system Python and conda environments).""" environments = get_python_environments() if not environments: return "No Python environments found." result = "Available Python Environments:\n\n" for env in environments: result += f"- Name: {env['name']}\n" result += f" Path: {env['path']}\n" result += f" Version: Python {env['version']}\n\n" return result @mcp.tool() def list_installed_packages(environment: str = "default") -> str: """ List installed packages for a specific Python environment. Args: environment: Name of the Python environment """ environments = get_python_environments() if environment == "default" and not any(e["name"] == "default" for e in environments): environment = "system" env = next((e for e in environments if e["name"] == environment), None) if not env: return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" packages = get_installed_packages(env["path"]) if not packages: return f"No packages found in environment '{environment}'." result = f"Installed Packages in '{environment}':\n\n" for pkg in packages: result += f"- {pkg['name']} {pkg['version']}\n" return result @mcp.tool() async def run_python_code( code: str, execution_mode: str = "inline", session_id: str = "default", environment: str = "system", save_as: Optional[str] = None, timeout: int = 300 ) -> str: """ Execute Python code with flexible execution modes. Args: code: Python code to execute execution_mode: Execution mode - "inline" (default, fast, in-process) or "subprocess" (isolated) session_id: Session ID for inline mode to maintain state across executions environment: Python environment name (only for subprocess mode) save_as: Optional filename to save the code before execution timeout: Maximum execution time in seconds (only enforced for subprocess mode) Returns: Execution result with output Execution modes: - "inline" (default): Executes code in the current process. Fast and reliable, maintains session state. Use for most code execution tasks. - "subprocess": Executes code in a separate Python process. Use when you need environment isolation or a different Python environment. """ # Save code if requested if save_as: save_path = WORKING_DIR / save_as if not save_path.suffix == '.py': save_path = save_path.with_suffix('.py') try: save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'w', encoding='utf-8') as f: f.write(code) except Exception as e: return f"Error saving code to file: {str(e)}" # Execute based on mode if execution_mode == "inline": # In-process execution (default, fast, no subprocess issues) try: session = get_session(session_id) result = session.execute(code, timeout) # Store in history session.history.append({ "code": code, "stdout": result["stdout"], "stderr": result["stderr"], "status": result["status"] }) output = f"Execution in session '{session_id}' (inline mode)" if save_as: output += f" (saved to {save_as})" output += ":\n\n" if result["status"] == 0: output += "--- Output ---\n" output += result["stdout"] if result["stdout"] else "(No output)\n" else: output += "--- Error ---\n" output += result["stderr"] if result["stderr"] else "(No error message)\n" if result["stdout"]: output += "\n--- Output ---\n" output += result["stdout"] return output except Exception as e: return f"Error in inline execution: {str(e)}\n{traceback.format_exc()}" elif execution_mode == "subprocess": # Subprocess execution (for environment isolation) environments = get_python_environments() if environment == "default" and not any(e["name"] == "default" for e in environments): environment = "system" env = next((e for e in environments if e["name"] == environment), None) if not env: return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" result = await execute_python_code_subprocess(code, env["path"], str(WORKING_DIR), timeout) output = f"Execution in '{environment}' environment (subprocess mode)" if save_as: output += f" (saved to {save_as})" output += ":\n\n" if result["status"] == 0: output += "--- Output ---\n" output += result["stdout"] if result["stdout"] else "(No output)\n" else: output += f"--- Error (status code: {result['status']}) ---\n" output += result["stderr"] if result["stderr"] else "(No error message)\n" if result["stdout"]: output += "\n--- Output ---\n" output += result["stdout"] return output else: return f"Unknown execution mode: {execution_mode}. Use 'inline' or 'subprocess'." @mcp.tool() async def run_python_file( file_path: str, environment: str = "default", arguments: Optional[List[str]] = None, timeout: int = 300 ) -> str: """ Execute a Python file (always uses subprocess for file execution). Args: file_path: Path to the Python file to execute environment: Name of the Python environment to use arguments: List of command-line arguments to pass to the script timeout: Maximum execution time in seconds (default: 300) """ path = Path(file_path) if path.is_absolute(): if not is_path_allowed(path): return f"Access denied: Can only run files in working directory: {WORKING_DIR}" else: path = WORKING_DIR / path if not path.exists(): return f"File '{path}' not found." environments = get_python_environments() if environment == "default" and not any(e["name"] == "default" for e in environments): environment = "system" env = next((e for e in environments if e["name"] == environment), None) if not env: return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" cmd = [env["path"], str(path)] if arguments: cmd.extend(arguments) result = await run_subprocess_async(cmd, cwd=str(WORKING_DIR), timeout=timeout) output = f"Execution of '{path}' in '{environment}' environment:\n\n" if result["status"] == 0: output += "--- Output ---\n" output += result["stdout"] if result["stdout"] else "(No output)\n" else: output += f"--- Error (status code: {result['status']}) ---\n" output += result["stderr"] if result["stderr"] else "(No error message)\n" if result["stdout"]: output += "\n--- Output ---\n" output += result["stdout"] return output @mcp.tool() async def install_package( package_name: str, environment: str = "default", upgrade: bool = False, timeout: int = 300 ) -> str: """ Install a Python package in the specified environment. Args: package_name: Name of the package to install environment: Name of the Python environment upgrade: Whether to upgrade if already installed timeout: Maximum execution time in seconds """ environments = get_python_environments() if environment == "default" and not any(e["name"] == "default" for e in environments): environment = "system" env = next((e for e in environments if e["name"] == environment), None) if not env: return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" cmd = [env["path"], "-m", "pip", "install"] if upgrade: cmd.append("--upgrade") cmd.append(package_name) result = await run_subprocess_async(cmd, timeout=timeout) if result["status"] == 0: return f"Successfully {'upgraded' if upgrade else 'installed'} {package_name} in {environment}." else: return f"Error installing {package_name}:\n{result['stderr']}" @mcp.tool() def read_file(file_path: str, max_size_kb: int = 1024) -> str: """ Read the content of any file, with size limits for safety. Args: file_path: Path to the file max_size_kb: Maximum file size to read in KB """ path = Path(file_path) if path.is_absolute(): if not is_path_allowed(path): return f"Access denied: Can only read files in working directory: {WORKING_DIR}" else: path = WORKING_DIR / path try: if not path.exists(): return f"Error: File '{file_path}' not found" file_size_kb = path.stat().st_size / 1024 if file_size_kb > max_size_kb: return f"Error: File size ({file_size_kb:.2f} KB) exceeds maximum ({max_size_kb} KB)" try: with open(path, 'r', encoding='utf-8') as f: content = f.read() source_extensions = ['.py', '.js', '.html', '.css', '.json', '.xml', '.md', '.txt', '.sh', '.bat', '.ps1'] if path.suffix.lower() in source_extensions: file_type = path.suffix[1:] if path.suffix else 'plain' return f"File: {file_path}\n\n```{file_type}\n{content}\n```" return f"File: {file_path}\n\n{content}" except UnicodeDecodeError: with open(path, 'rb') as f: content = f.read() hex_content = content.hex() return f"Binary file: {file_path}\nSize: {len(content)} bytes\nHex (first 1024 chars):\n{hex_content[:1024]}" except Exception as e: return f"Error reading file: {str(e)}" @mcp.tool() def write_file( file_path: str, content: str, overwrite: bool = False ) -> str: """ Write content to a file. Args: file_path: Path to the file to write content: Content to write overwrite: Whether to overwrite if exists """ path = Path(file_path) if path.is_absolute(): if not is_path_allowed(path): return f"Access denied: Can only write files in working directory: {WORKING_DIR}" else: path = WORKING_DIR / path try: if path.exists() and not overwrite: return f"File '{path}' exists. Use overwrite=True to replace." path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'w', encoding='utf-8') as f: f.write(content) f.flush() os.fsync(f.fileno()) file_size_kb = path.stat().st_size / 1024 return f"Successfully wrote to {path}. Size: {file_size_kb:.2f} KB" except Exception as e: return f"Error writing file: {str(e)}" @mcp.tool() def list_directory(directory_path: str = "") -> str: """ List all Python files in a directory. Args: directory_path: Path to directory (empty for working directory) """ try: if not directory_path: path = WORKING_DIR else: path = Path(directory_path) if path.is_absolute(): if not is_path_allowed(path): return f"Access denied: Can only list files in working directory: {WORKING_DIR}" else: path = WORKING_DIR / directory_path if not path.exists(): return f"Error: Directory '{directory_path}' not found" if not path.is_dir(): return f"Error: '{directory_path}' is not a directory" files = find_python_files(path) if not files: return f"No Python files found in {directory_path or 'working directory'}" result = f"Python files in: {directory_path or str(WORKING_DIR)}\n\n" files_by_dir = {} base_dir = path if ALLOW_SYSTEM_ACCESS else WORKING_DIR for file in files: file_path = Path(file["path"]) try: relative_path = file_path.relative_to(base_dir) parent = str(relative_path.parent) if parent == ".": parent = "(root)" except ValueError: parent = str(file_path.parent) if parent not in files_by_dir: files_by_dir[parent] = [] files_by_dir[parent].append({ "name": file["name"], "size": file["size"] }) for dir_name, dir_files in sorted(files_by_dir.items()): result += f"📁 {dir_name}:\n" for file in sorted(dir_files, key=lambda x: x["name"]): size_kb = round(file["size"] / 1024, 1) result += f" 📄 {file['name']} ({size_kb} KB)\n" result += "\n" return result except Exception as e: return f"Error listing directory: {str(e)}" @mcp.tool() def clear_session(session_id: str = "default") -> str: """ Clear a REPL session's state and history. Args: session_id: Session ID to clear """ if session_id in _sessions: del _sessions[session_id] return f"Session '{session_id}' cleared." else: return f"Session '{session_id}' not found." @mcp.tool() def list_sessions() -> str: """List all active REPL sessions.""" if not _sessions: return "No active sessions." result = "Active REPL Sessions:\n\n" for session_id, session in _sessions.items(): result += f"- Session: {session_id}\n" result += f" History entries: {len(session.history)}\n" result += f" Variables: {len([k for k in session.locals.keys() if not k.startswith('__')])}\n\n" return result # ============================================================================ # Prompts # ============================================================================ @mcp.prompt() def python_function_template(description: str) -> str: """Generate a template for a Python function with docstring.""" return f"""Please create a Python function based on this description: {description} Include: - Type hints - Docstring with parameters, return value, and examples - Error handling where appropriate - Comments for complex logic""" @mcp.prompt() def refactor_python_code(code: str) -> str: """Help refactor Python code for better readability and performance.""" return f"""Please refactor this Python code to improve readability, performance, error handling, and structure: ```python {code} ``` Explain the changes you made and why they improve the code.""" @mcp.prompt() def debug_python_error(code: str, error_message: str) -> str: """Help debug a Python error.""" return f"""I'm getting this error: ```python {code} ``` Error message: ``` {error_message} ``` Please help by: 1. Explaining what the error means 2. Identifying the cause 3. Suggesting fixes""" # Run the server if __name__ == "__main__": mcp.run(transport='stdio') ```