# Directory Structure ``` ├── .gitignore ├── .python-version ├── LICENSE ├── mcp_python_interpreter │ ├── __init__.py │ ├── main.py │ └── server.py ├── pyproject.toml ├── README.md └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.10 2 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python-generated files 2 | __pycache__/ 3 | *.py[oc] 4 | build/ 5 | dist/ 6 | wheels/ 7 | test_dir/ 8 | *.egg-info 9 | .pyirc 10 | 11 | # Virtual environments 12 | .venv 13 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | [](https://mseep.ai/app/yzfly-mcp-python-interpreter) 2 | 3 | # MCP Python Interpreter 4 | 5 | A Model Context Protocol (MCP) server that allows LLMs to interact with Python environments, read and write files, execute Python code, and manage development workflows. 6 | 7 | ## Features 8 | 9 | - **Environment Management**: List and use different Python environments (system and conda) 10 | - **Code Execution**: Run Python code or scripts in any available environment 11 | - **Package Management**: List installed packages and install new ones 12 | - **File Operations**: 13 | - Read files of any type (text, source code, binary) 14 | - Write text and binary files 15 | - **Python Prompts**: Templates for common Python tasks like function creation and debugging 16 | 17 | ## Installation 18 | 19 | You can install the MCP Python Interpreter using pip: 20 | 21 | ```bash 22 | pip install mcp-python-interpreter 23 | ``` 24 | 25 | Or with uv: 26 | 27 | ```bash 28 | uv install mcp-python-interpreter 29 | ``` 30 | 31 | ## Usage with Claude Desktop 32 | 33 | 1. Install [Claude Desktop](https://claude.ai/download) 34 | 2. Open Claude Desktop, click on menu, then Settings 35 | 3. Go to Developer tab and click "Edit Config" 36 | 4. Add the following to your `claude_desktop_config.json`: 37 | 38 | ```json 39 | { 40 | "mcpServers": { 41 | "mcp-python-interpreter": { 42 | "command": "uvx", 43 | "args": [ 44 | "mcp-python-interpreter", 45 | "--dir", 46 | "/path/to/your/work/dir", 47 | "--python-path", 48 | "/path/to/your/python" 49 | ], 50 | "env": { 51 | "MCP_ALLOW_SYSTEM_ACCESS": 0 52 | }, 53 | } 54 | } 55 | } 56 | ``` 57 | 58 | For Windows: 59 | 60 | ```json 61 | { 62 | "mcpServers": { 63 | "python-interpreter": { 64 | "command": "uvx", 65 | "args": [ 66 | "mcp-python-interpreter", 67 | "--dir", 68 | "C:\\path\\to\\your\\working\\directory", 69 | "--python-path", 70 | "/path/to/your/python" 71 | ], 72 | "env": { 73 | "MCP_ALLOW_SYSTEM_ACCESS": "0" 74 | }, 75 | } 76 | } 77 | } 78 | ``` 79 | 80 | 5. Restart Claude Desktop 81 | 6. You should now see the MCP tools icon in the chat interface 82 | 83 | The `--dir` parameter is **required** and specifies where all files will be saved and executed. This helps maintain security by isolating the MCP server to a specific directory. 84 | 85 | ### Prerequisites 86 | 87 | - Make sure you have `uv` installed. If not, install it using: 88 | ```bash 89 | curl -LsSf https://astral.sh/uv/install.sh | sh 90 | ``` 91 | - For Windows: 92 | ```powershell 93 | powershell -ExecutionPolicy Bypass -Command "iwr -useb https://astral.sh/uv/install.ps1 | iex" 94 | ``` 95 | 96 | ## Available Tools 97 | 98 | The Python Interpreter provides the following tools: 99 | 100 | ### Environment and Package Management 101 | - **list_python_environments**: List all available Python environments (system and conda) 102 | - **list_installed_packages**: List packages installed in a specific environment 103 | - **install_package**: Install a Python package in a specific environment 104 | 105 | ### Code Execution 106 | - **run_python_code**: Execute Python code in a specific environment 107 | - **run_python_file**: Execute a Python file in a specific environment 108 | 109 | ### File Operations 110 | - **read_file**: Read contents of any file type, with size and safety limits 111 | - Supports text files with syntax highlighting 112 | - Displays hex representation for binary files 113 | - **write_file**: Create or overwrite files with text or binary content 114 | - **write_python_file**: Create or overwrite a Python file specifically 115 | - **list_directory**: List Python files in a directory 116 | 117 | ## Available Resources 118 | 119 | - **python://environments**: List all available Python environments 120 | - **python://packages/{env_name}**: List installed packages for a specific environment 121 | - **python://file/{file_path}**: Get the content of a Python file 122 | - **python://directory/{directory_path}**: List all Python files in a directory 123 | 124 | ## Prompts 125 | 126 | - **python_function_template**: Generate a template for a Python function 127 | - **refactor_python_code**: Help refactor Python code 128 | - **debug_python_error**: Help debug a Python error 129 | 130 | ## Example Usage 131 | 132 | Here are some examples of what you can ask Claude to do with this MCP server: 133 | 134 | - "Show me all available Python environments on my system" 135 | - "Run this Python code in my conda-base environment: print('Hello, world!')" 136 | - "Create a new Python file called 'hello.py' with a function that says hello" 137 | - "Read the contents of my 'data.json' file" 138 | - "Write a new configuration file with these settings..." 139 | - "List all packages installed in my system Python environment" 140 | - "Install the requests package in my system Python environment" 141 | - "Run data_analysis.py with these arguments: --input=data.csv --output=results.csv" 142 | 143 | ## File Handling Capabilities 144 | 145 | The MCP Python Interpreter now supports comprehensive file operations: 146 | - Read text and binary files up to 1MB 147 | - Write text and binary files 148 | - Syntax highlighting for source code files 149 | - Hex representation for binary files 150 | - Strict file path security (only within the working directory) 151 | 152 | ## Security Considerations 153 | 154 | This MCP server has access to your Python environments and file system. Key security features include: 155 | - Isolated working directory 156 | - File size limits 157 | - Prevented writes outside the working directory 158 | - Explicit overwrite protection 159 | 160 | Always be cautious about running code or file operations that you don't fully understand. 161 | 162 | ## License 163 | 164 | MIT 165 | ``` -------------------------------------------------------------------------------- /mcp_python_interpreter/main.py: -------------------------------------------------------------------------------- ```python 1 | """Main module for mcp-python-interpreter.""" 2 | 3 | from mcp_python_interpreter.server import mcp 4 | 5 | 6 | def main(): 7 | """Run the MCP Python Interpreter server.""" 8 | mcp.run(transport='stdio') 9 | 10 | 11 | if __name__ == "__main__": 12 | main() ``` -------------------------------------------------------------------------------- /mcp_python_interpreter/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | MCP Python Interpreter 3 | 4 | A Model Context Protocol server for Python code execution and environment management. 5 | """ 6 | 7 | __version__ = "1.2.3" 8 | __author__ = "YZFly" 9 | __email__ = "[email protected]" 10 | 11 | from mcp_python_interpreter.server import mcp 12 | 13 | __all__ = ["mcp"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [build-system] 2 | requires = ["setuptools>=61.0", "wheel"] 3 | build-backend = "setuptools.build_meta" 4 | 5 | [project] 6 | name = "mcp-python-interpreter" 7 | version = "1.2.3" 8 | description = "MCP server for Python code execution and environment management" 9 | authors = [ 10 | {name = "YZFly", email = "[email protected]"}, 11 | ] 12 | readme = "README.md" 13 | requires-python = ">=3.10" 14 | classifiers = [ 15 | "Programming Language :: Python :: 3", 16 | "License :: OSI Approved :: MIT License", 17 | "Operating System :: OS Independent", 18 | ] 19 | dependencies = [ 20 | "fastmcp>=2.0.0", 21 | ] 22 | 23 | [project.scripts] 24 | mcp-python-interpreter = "mcp_python_interpreter.main:main" 25 | 26 | [project.urls] 27 | "Homepage" = "https://github.com/yzfly/mcp-python-interpreter" 28 | "Bug Tracker" = "https://github.com/yzfly/mcp-python-interpreter/issues" 29 | 30 | [tool.setuptools] 31 | packages = ["mcp_python_interpreter"] ``` -------------------------------------------------------------------------------- /mcp_python_interpreter/server.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | MCP Python Interpreter 3 | 4 | A Model Context Protocol server for interacting with Python environments 5 | and executing Python code. Supports both in-process execution (default, fast) 6 | and subprocess execution (for environment isolation). 7 | """ 8 | 9 | import os 10 | import sys 11 | import json 12 | import subprocess 13 | import tempfile 14 | import argparse 15 | import traceback 16 | import builtins 17 | from pathlib import Path 18 | from io import StringIO 19 | from typing import Dict, List, Optional, Any 20 | import asyncio 21 | import concurrent.futures 22 | 23 | # Import MCP SDK 24 | from mcp.server.fastmcp import FastMCP 25 | 26 | # Parse command line arguments 27 | parser = argparse.ArgumentParser(description='MCP Python Interpreter') 28 | parser.add_argument('--dir', type=str, default=os.getcwd(), 29 | help='Working directory for code execution and file operations') 30 | parser.add_argument('--python-path', type=str, default=None, 31 | help='Custom Python interpreter path to use as default') 32 | args, unknown = parser.parse_known_args() 33 | 34 | # Configuration 35 | ALLOW_SYSTEM_ACCESS = os.environ.get('MCP_ALLOW_SYSTEM_ACCESS', 'false').lower() in ('true', '1', 'yes') 36 | WORKING_DIR = Path(args.dir).absolute() 37 | WORKING_DIR.mkdir(parents=True, exist_ok=True) 38 | DEFAULT_PYTHON_PATH = args.python_path if args.python_path else sys.executable 39 | 40 | # Startup message 41 | print(f"MCP Python Interpreter starting in directory: {WORKING_DIR}", file=sys.stderr) 42 | print(f"Using default Python interpreter: {DEFAULT_PYTHON_PATH}", file=sys.stderr) 43 | print(f"System-wide file access: {'ENABLED' if ALLOW_SYSTEM_ACCESS else 'DISABLED'}", file=sys.stderr) 44 | print(f"Platform: {sys.platform}", file=sys.stderr) 45 | 46 | # Create MCP server 47 | mcp = FastMCP("python-interpreter") 48 | 49 | # Thread pool for subprocess fallback 50 | _executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) 51 | 52 | # ============================================================================ 53 | # REPL Session Management (for in-process execution) 54 | # ============================================================================ 55 | 56 | class ReplSession: 57 | """Manages a Python REPL session with persistent state.""" 58 | 59 | def __init__(self): 60 | self.locals = { 61 | "__builtins__": builtins, 62 | "__name__": "__main__", 63 | "__doc__": None, 64 | "__package__": None, 65 | } 66 | self.history = [] 67 | 68 | def execute(self, code: str, timeout: Optional[int] = None) -> Dict[str, Any]: 69 | """ 70 | Execute Python code in this session. 71 | 72 | Args: 73 | code: Python code to execute 74 | timeout: Optional timeout (not enforced for inline execution) 75 | 76 | Returns: 77 | Dict with stdout, stderr, result, and status 78 | """ 79 | stdout_capture = StringIO() 80 | stderr_capture = StringIO() 81 | 82 | # Save original streams 83 | old_stdout, old_stderr = sys.stdout, sys.stderr 84 | sys.stdout, sys.stderr = stdout_capture, stderr_capture 85 | 86 | result_value = None 87 | status = 0 88 | 89 | try: 90 | # Change to working directory for execution 91 | old_cwd = os.getcwd() 92 | os.chdir(WORKING_DIR) 93 | 94 | try: 95 | # Try to evaluate as expression first 96 | try: 97 | result_value = eval(code, self.locals) 98 | if result_value is not None: 99 | print(repr(result_value)) 100 | except SyntaxError: 101 | # If not an expression, execute as statement 102 | exec(code, self.locals) 103 | 104 | except Exception: 105 | traceback.print_exc() 106 | status = 1 107 | finally: 108 | os.chdir(old_cwd) 109 | 110 | finally: 111 | # Restore original streams 112 | sys.stdout, sys.stderr = old_stdout, old_stderr 113 | 114 | return { 115 | "stdout": stdout_capture.getvalue(), 116 | "stderr": stderr_capture.getvalue(), 117 | "result": result_value, 118 | "status": status 119 | } 120 | 121 | # Global sessions storage 122 | _sessions: Dict[str, ReplSession] = {} 123 | 124 | def get_session(session_id: str = "default") -> ReplSession: 125 | """Get or create a REPL session.""" 126 | if session_id not in _sessions: 127 | _sessions[session_id] = ReplSession() 128 | return _sessions[session_id] 129 | 130 | # ============================================================================ 131 | # Helper functions 132 | # ============================================================================ 133 | 134 | def is_path_allowed(path: Path) -> bool: 135 | """Check if a path is allowed based on security settings.""" 136 | if ALLOW_SYSTEM_ACCESS: 137 | return True 138 | 139 | try: 140 | path.resolve().relative_to(WORKING_DIR.resolve()) 141 | return True 142 | except ValueError: 143 | return False 144 | 145 | 146 | def _run_subprocess_sync( 147 | cmd: List[str], 148 | cwd: Optional[str] = None, 149 | timeout: int = 300 150 | ) -> Dict[str, Any]: 151 | """Synchronous subprocess execution for Windows compatibility.""" 152 | try: 153 | creation_flags = 0 154 | if sys.platform == "win32": 155 | creation_flags = subprocess.CREATE_NO_WINDOW 156 | try: 157 | creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP 158 | except AttributeError: 159 | pass 160 | 161 | result = subprocess.run( 162 | cmd, 163 | cwd=cwd, 164 | capture_output=True, 165 | text=True, 166 | timeout=timeout, 167 | creationflags=creation_flags if sys.platform == "win32" else 0, 168 | encoding='utf-8', 169 | errors='replace', 170 | stdin=subprocess.DEVNULL 171 | ) 172 | 173 | return { 174 | "stdout": result.stdout, 175 | "stderr": result.stderr, 176 | "status": result.returncode 177 | } 178 | 179 | except subprocess.TimeoutExpired as e: 180 | stdout = e.stdout.decode('utf-8', errors='replace') if e.stdout else "" 181 | stderr = e.stderr.decode('utf-8', errors='replace') if e.stderr else "" 182 | 183 | return { 184 | "stdout": stdout, 185 | "stderr": stderr + f"\nExecution timed out after {timeout} seconds", 186 | "status": -1 187 | } 188 | 189 | except Exception as e: 190 | return { 191 | "stdout": "", 192 | "stderr": f"Error executing command: {str(e)}", 193 | "status": -1 194 | } 195 | 196 | 197 | async def run_subprocess_async( 198 | cmd: List[str], 199 | cwd: Optional[str] = None, 200 | timeout: int = 300, 201 | input_data: Optional[str] = None 202 | ) -> Dict[str, Any]: 203 | """Run subprocess with Windows compatibility.""" 204 | 205 | # On Windows, use thread pool for reliability 206 | if sys.platform == "win32": 207 | if input_data: 208 | print("Warning: input_data not supported on Windows sync mode", file=sys.stderr) 209 | 210 | loop = asyncio.get_event_loop() 211 | from functools import partial 212 | func = partial(_run_subprocess_sync, cmd, cwd, timeout) 213 | result = await loop.run_in_executor(_executor, func) 214 | return result 215 | 216 | # On Unix, use asyncio 217 | try: 218 | process = await asyncio.create_subprocess_exec( 219 | *cmd, 220 | stdout=asyncio.subprocess.PIPE, 221 | stderr=asyncio.subprocess.PIPE, 222 | stdin=asyncio.subprocess.PIPE if input_data else asyncio.subprocess.DEVNULL, 223 | cwd=cwd 224 | ) 225 | 226 | try: 227 | stdout, stderr = await asyncio.wait_for( 228 | process.communicate(input=input_data.encode('utf-8') if input_data else None), 229 | timeout=timeout 230 | ) 231 | 232 | return { 233 | "stdout": stdout.decode('utf-8', errors='replace'), 234 | "stderr": stderr.decode('utf-8', errors='replace'), 235 | "status": process.returncode 236 | } 237 | 238 | except asyncio.TimeoutError: 239 | try: 240 | process.kill() 241 | await process.wait() 242 | except: 243 | pass 244 | 245 | return { 246 | "stdout": "", 247 | "stderr": f"Execution timed out after {timeout} seconds", 248 | "status": -1 249 | } 250 | 251 | except Exception as e: 252 | return { 253 | "stdout": "", 254 | "stderr": f"Error executing command: {str(e)}", 255 | "status": -1 256 | } 257 | 258 | 259 | async def execute_python_code_subprocess( 260 | code: str, 261 | python_path: Optional[str] = None, 262 | working_dir: Optional[str] = None, 263 | timeout: int = 300 264 | ) -> Dict[str, Any]: 265 | """Execute Python code via subprocess (for environment isolation).""" 266 | if python_path is None: 267 | python_path = DEFAULT_PYTHON_PATH 268 | 269 | temp_file = None 270 | try: 271 | fd, temp_file = tempfile.mkstemp(suffix='.py', text=True) 272 | 273 | try: 274 | with os.fdopen(fd, 'w', encoding='utf-8') as f: 275 | f.write(code) 276 | f.flush() 277 | os.fsync(f.fileno()) 278 | except Exception as e: 279 | os.close(fd) 280 | raise e 281 | 282 | if sys.platform == "win32": 283 | await asyncio.sleep(0.05) 284 | temp_file = os.path.abspath(temp_file) 285 | if working_dir: 286 | working_dir = os.path.abspath(working_dir) 287 | 288 | result = await run_subprocess_async( 289 | [python_path, temp_file], 290 | cwd=working_dir, 291 | timeout=timeout 292 | ) 293 | 294 | return result 295 | 296 | finally: 297 | if temp_file: 298 | try: 299 | if sys.platform == "win32": 300 | await asyncio.sleep(0.05) 301 | 302 | if os.path.exists(temp_file): 303 | os.unlink(temp_file) 304 | except Exception as e: 305 | print(f"Warning: Could not delete temp file {temp_file}: {e}", file=sys.stderr) 306 | 307 | 308 | def get_python_environments() -> List[Dict[str, str]]: 309 | """Get all available Python environments.""" 310 | environments = [] 311 | 312 | if DEFAULT_PYTHON_PATH != sys.executable: 313 | try: 314 | result = subprocess.run( 315 | [DEFAULT_PYTHON_PATH, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}')"], 316 | capture_output=True, text=True, check=True, timeout=10, 317 | stdin=subprocess.DEVNULL, 318 | creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 319 | ) 320 | version = result.stdout.strip() 321 | 322 | environments.append({ 323 | "name": "default", 324 | "path": DEFAULT_PYTHON_PATH, 325 | "version": version 326 | }) 327 | except Exception as e: 328 | print(f"Error getting version for custom Python path: {e}", file=sys.stderr) 329 | 330 | environments.append({ 331 | "name": "system", 332 | "path": sys.executable, 333 | "version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" 334 | }) 335 | 336 | # Try conda environments 337 | try: 338 | result = subprocess.run( 339 | ["conda", "info", "--envs", "--json"], 340 | capture_output=True, text=True, check=False, timeout=10, 341 | stdin=subprocess.DEVNULL, 342 | creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 343 | ) 344 | 345 | if result.returncode == 0: 346 | conda_info = json.loads(result.stdout) 347 | for env in conda_info.get("envs", []): 348 | env_name = os.path.basename(env) 349 | if env_name == "base": 350 | env_name = "conda-base" 351 | 352 | python_path = os.path.join(env, "bin", "python") 353 | if not os.path.exists(python_path): 354 | python_path = os.path.join(env, "python.exe") 355 | 356 | if os.path.exists(python_path): 357 | try: 358 | version_result = subprocess.run( 359 | [python_path, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}')"], 360 | capture_output=True, text=True, check=True, timeout=10, 361 | stdin=subprocess.DEVNULL, 362 | creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 363 | ) 364 | version = version_result.stdout.strip() 365 | 366 | environments.append({ 367 | "name": env_name, 368 | "path": python_path, 369 | "version": version 370 | }) 371 | except Exception: 372 | pass 373 | except Exception as e: 374 | print(f"Error getting conda environments: {e}", file=sys.stderr) 375 | 376 | return environments 377 | 378 | 379 | def get_installed_packages(python_path: str) -> List[Dict[str, str]]: 380 | """Get installed packages for a specific Python environment.""" 381 | try: 382 | result = subprocess.run( 383 | [python_path, "-m", "pip", "list", "--format=json"], 384 | capture_output=True, text=True, check=True, timeout=30, 385 | stdin=subprocess.DEVNULL, 386 | creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 387 | ) 388 | return json.loads(result.stdout) 389 | except Exception as e: 390 | print(f"Error getting installed packages: {e}", file=sys.stderr) 391 | return [] 392 | 393 | 394 | def find_python_files(directory: Path) -> List[Dict[str, str]]: 395 | """Find all Python files in a directory.""" 396 | files = [] 397 | 398 | if not directory.exists(): 399 | return files 400 | 401 | for path in directory.rglob("*.py"): 402 | if path.is_file(): 403 | files.append({ 404 | "path": str(path), 405 | "name": path.name, 406 | "size": path.stat().st_size, 407 | "modified": path.stat().st_mtime 408 | }) 409 | 410 | return files 411 | 412 | 413 | # ============================================================================ 414 | # Resources 415 | # ============================================================================ 416 | 417 | @mcp.resource("python://environments") 418 | def get_environments_resource() -> str: 419 | """List all available Python environments as a resource.""" 420 | environments = get_python_environments() 421 | return json.dumps(environments, indent=2) 422 | 423 | 424 | @mcp.resource("python://packages/{env_name}") 425 | def get_packages_resource(env_name: str) -> str: 426 | """List installed packages for a specific environment as a resource.""" 427 | environments = get_python_environments() 428 | 429 | env = next((e for e in environments if e["name"] == env_name), None) 430 | if not env: 431 | return json.dumps({"error": f"Environment '{env_name}' not found"}) 432 | 433 | packages = get_installed_packages(env["path"]) 434 | return json.dumps(packages, indent=2) 435 | 436 | 437 | @mcp.resource("python://directory") 438 | def get_working_directory_listing() -> str: 439 | """List all Python files in the working directory as a resource.""" 440 | try: 441 | files = find_python_files(WORKING_DIR) 442 | return json.dumps({ 443 | "working_directory": str(WORKING_DIR), 444 | "files": files 445 | }, indent=2) 446 | except Exception as e: 447 | return json.dumps({"error": f"Error listing directory: {str(e)}"}) 448 | 449 | 450 | @mcp.resource("python://session/{session_id}/history") 451 | def get_session_history(session_id: str) -> str: 452 | """Get execution history for a REPL session.""" 453 | if session_id not in _sessions: 454 | return json.dumps({"error": f"Session '{session_id}' not found"}) 455 | 456 | session = _sessions[session_id] 457 | return json.dumps({ 458 | "session_id": session_id, 459 | "history": session.history 460 | }, indent=2) 461 | 462 | 463 | # ============================================================================ 464 | # Tools 465 | # ============================================================================ 466 | 467 | @mcp.tool() 468 | def list_python_environments() -> str: 469 | """List all available Python environments (system Python and conda environments).""" 470 | environments = get_python_environments() 471 | 472 | if not environments: 473 | return "No Python environments found." 474 | 475 | result = "Available Python Environments:\n\n" 476 | for env in environments: 477 | result += f"- Name: {env['name']}\n" 478 | result += f" Path: {env['path']}\n" 479 | result += f" Version: Python {env['version']}\n\n" 480 | 481 | return result 482 | 483 | 484 | @mcp.tool() 485 | def list_installed_packages(environment: str = "default") -> str: 486 | """ 487 | List installed packages for a specific Python environment. 488 | 489 | Args: 490 | environment: Name of the Python environment 491 | """ 492 | environments = get_python_environments() 493 | 494 | if environment == "default" and not any(e["name"] == "default" for e in environments): 495 | environment = "system" 496 | 497 | env = next((e for e in environments if e["name"] == environment), None) 498 | if not env: 499 | return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" 500 | 501 | packages = get_installed_packages(env["path"]) 502 | 503 | if not packages: 504 | return f"No packages found in environment '{environment}'." 505 | 506 | result = f"Installed Packages in '{environment}':\n\n" 507 | for pkg in packages: 508 | result += f"- {pkg['name']} {pkg['version']}\n" 509 | 510 | return result 511 | 512 | 513 | @mcp.tool() 514 | async def run_python_code( 515 | code: str, 516 | execution_mode: str = "inline", 517 | session_id: str = "default", 518 | environment: str = "system", 519 | save_as: Optional[str] = None, 520 | timeout: int = 300 521 | ) -> str: 522 | """ 523 | Execute Python code with flexible execution modes. 524 | 525 | Args: 526 | code: Python code to execute 527 | execution_mode: Execution mode - "inline" (default, fast, in-process) or "subprocess" (isolated) 528 | session_id: Session ID for inline mode to maintain state across executions 529 | environment: Python environment name (only for subprocess mode) 530 | save_as: Optional filename to save the code before execution 531 | timeout: Maximum execution time in seconds (only enforced for subprocess mode) 532 | 533 | Returns: 534 | Execution result with output 535 | 536 | Execution modes: 537 | - "inline" (default): Executes code in the current process. Fast and reliable, 538 | maintains session state. Use for most code execution tasks. 539 | - "subprocess": Executes code in a separate Python process. Use when you need 540 | environment isolation or a different Python environment. 541 | """ 542 | 543 | # Save code if requested 544 | if save_as: 545 | save_path = WORKING_DIR / save_as 546 | if not save_path.suffix == '.py': 547 | save_path = save_path.with_suffix('.py') 548 | 549 | try: 550 | save_path.parent.mkdir(parents=True, exist_ok=True) 551 | with open(save_path, 'w', encoding='utf-8') as f: 552 | f.write(code) 553 | except Exception as e: 554 | return f"Error saving code to file: {str(e)}" 555 | 556 | # Execute based on mode 557 | if execution_mode == "inline": 558 | # In-process execution (default, fast, no subprocess issues) 559 | try: 560 | session = get_session(session_id) 561 | result = session.execute(code, timeout) 562 | 563 | # Store in history 564 | session.history.append({ 565 | "code": code, 566 | "stdout": result["stdout"], 567 | "stderr": result["stderr"], 568 | "status": result["status"] 569 | }) 570 | 571 | output = f"Execution in session '{session_id}' (inline mode)" 572 | if save_as: 573 | output += f" (saved to {save_as})" 574 | output += ":\n\n" 575 | 576 | if result["status"] == 0: 577 | output += "--- Output ---\n" 578 | output += result["stdout"] if result["stdout"] else "(No output)\n" 579 | else: 580 | output += "--- Error ---\n" 581 | output += result["stderr"] if result["stderr"] else "(No error message)\n" 582 | 583 | if result["stdout"]: 584 | output += "\n--- Output ---\n" 585 | output += result["stdout"] 586 | 587 | return output 588 | 589 | except Exception as e: 590 | return f"Error in inline execution: {str(e)}\n{traceback.format_exc()}" 591 | 592 | elif execution_mode == "subprocess": 593 | # Subprocess execution (for environment isolation) 594 | environments = get_python_environments() 595 | 596 | if environment == "default" and not any(e["name"] == "default" for e in environments): 597 | environment = "system" 598 | 599 | env = next((e for e in environments if e["name"] == environment), None) 600 | if not env: 601 | return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" 602 | 603 | result = await execute_python_code_subprocess(code, env["path"], str(WORKING_DIR), timeout) 604 | 605 | output = f"Execution in '{environment}' environment (subprocess mode)" 606 | if save_as: 607 | output += f" (saved to {save_as})" 608 | output += ":\n\n" 609 | 610 | if result["status"] == 0: 611 | output += "--- Output ---\n" 612 | output += result["stdout"] if result["stdout"] else "(No output)\n" 613 | else: 614 | output += f"--- Error (status code: {result['status']}) ---\n" 615 | output += result["stderr"] if result["stderr"] else "(No error message)\n" 616 | 617 | if result["stdout"]: 618 | output += "\n--- Output ---\n" 619 | output += result["stdout"] 620 | 621 | return output 622 | 623 | else: 624 | return f"Unknown execution mode: {execution_mode}. Use 'inline' or 'subprocess'." 625 | 626 | 627 | @mcp.tool() 628 | async def run_python_file( 629 | file_path: str, 630 | environment: str = "default", 631 | arguments: Optional[List[str]] = None, 632 | timeout: int = 300 633 | ) -> str: 634 | """ 635 | Execute a Python file (always uses subprocess for file execution). 636 | 637 | Args: 638 | file_path: Path to the Python file to execute 639 | environment: Name of the Python environment to use 640 | arguments: List of command-line arguments to pass to the script 641 | timeout: Maximum execution time in seconds (default: 300) 642 | """ 643 | path = Path(file_path) 644 | if path.is_absolute(): 645 | if not is_path_allowed(path): 646 | return f"Access denied: Can only run files in working directory: {WORKING_DIR}" 647 | else: 648 | path = WORKING_DIR / path 649 | 650 | if not path.exists(): 651 | return f"File '{path}' not found." 652 | 653 | environments = get_python_environments() 654 | 655 | if environment == "default" and not any(e["name"] == "default" for e in environments): 656 | environment = "system" 657 | 658 | env = next((e for e in environments if e["name"] == environment), None) 659 | if not env: 660 | return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" 661 | 662 | cmd = [env["path"], str(path)] 663 | if arguments: 664 | cmd.extend(arguments) 665 | 666 | result = await run_subprocess_async(cmd, cwd=str(WORKING_DIR), timeout=timeout) 667 | 668 | output = f"Execution of '{path}' in '{environment}' environment:\n\n" 669 | 670 | if result["status"] == 0: 671 | output += "--- Output ---\n" 672 | output += result["stdout"] if result["stdout"] else "(No output)\n" 673 | else: 674 | output += f"--- Error (status code: {result['status']}) ---\n" 675 | output += result["stderr"] if result["stderr"] else "(No error message)\n" 676 | 677 | if result["stdout"]: 678 | output += "\n--- Output ---\n" 679 | output += result["stdout"] 680 | 681 | return output 682 | 683 | 684 | @mcp.tool() 685 | async def install_package( 686 | package_name: str, 687 | environment: str = "default", 688 | upgrade: bool = False, 689 | timeout: int = 300 690 | ) -> str: 691 | """ 692 | Install a Python package in the specified environment. 693 | 694 | Args: 695 | package_name: Name of the package to install 696 | environment: Name of the Python environment 697 | upgrade: Whether to upgrade if already installed 698 | timeout: Maximum execution time in seconds 699 | """ 700 | environments = get_python_environments() 701 | 702 | if environment == "default" and not any(e["name"] == "default" for e in environments): 703 | environment = "system" 704 | 705 | env = next((e for e in environments if e["name"] == environment), None) 706 | if not env: 707 | return f"Environment '{environment}' not found. Available: {', '.join(e['name'] for e in environments)}" 708 | 709 | cmd = [env["path"], "-m", "pip", "install"] 710 | if upgrade: 711 | cmd.append("--upgrade") 712 | cmd.append(package_name) 713 | 714 | result = await run_subprocess_async(cmd, timeout=timeout) 715 | 716 | if result["status"] == 0: 717 | return f"Successfully {'upgraded' if upgrade else 'installed'} {package_name} in {environment}." 718 | else: 719 | return f"Error installing {package_name}:\n{result['stderr']}" 720 | 721 | 722 | @mcp.tool() 723 | def read_file(file_path: str, max_size_kb: int = 1024) -> str: 724 | """ 725 | Read the content of any file, with size limits for safety. 726 | 727 | Args: 728 | file_path: Path to the file 729 | max_size_kb: Maximum file size to read in KB 730 | """ 731 | path = Path(file_path) 732 | if path.is_absolute(): 733 | if not is_path_allowed(path): 734 | return f"Access denied: Can only read files in working directory: {WORKING_DIR}" 735 | else: 736 | path = WORKING_DIR / path 737 | 738 | try: 739 | if not path.exists(): 740 | return f"Error: File '{file_path}' not found" 741 | 742 | file_size_kb = path.stat().st_size / 1024 743 | if file_size_kb > max_size_kb: 744 | return f"Error: File size ({file_size_kb:.2f} KB) exceeds maximum ({max_size_kb} KB)" 745 | 746 | try: 747 | with open(path, 'r', encoding='utf-8') as f: 748 | content = f.read() 749 | 750 | source_extensions = ['.py', '.js', '.html', '.css', '.json', '.xml', '.md', '.txt', '.sh', '.bat', '.ps1'] 751 | if path.suffix.lower() in source_extensions: 752 | file_type = path.suffix[1:] if path.suffix else 'plain' 753 | return f"File: {file_path}\n\n```{file_type}\n{content}\n```" 754 | 755 | return f"File: {file_path}\n\n{content}" 756 | 757 | except UnicodeDecodeError: 758 | with open(path, 'rb') as f: 759 | content = f.read() 760 | hex_content = content.hex() 761 | return f"Binary file: {file_path}\nSize: {len(content)} bytes\nHex (first 1024 chars):\n{hex_content[:1024]}" 762 | 763 | except Exception as e: 764 | return f"Error reading file: {str(e)}" 765 | 766 | 767 | @mcp.tool() 768 | def write_file( 769 | file_path: str, 770 | content: str, 771 | overwrite: bool = False 772 | ) -> str: 773 | """ 774 | Write content to a file. 775 | 776 | Args: 777 | file_path: Path to the file to write 778 | content: Content to write 779 | overwrite: Whether to overwrite if exists 780 | """ 781 | path = Path(file_path) 782 | if path.is_absolute(): 783 | if not is_path_allowed(path): 784 | return f"Access denied: Can only write files in working directory: {WORKING_DIR}" 785 | else: 786 | path = WORKING_DIR / path 787 | 788 | try: 789 | if path.exists() and not overwrite: 790 | return f"File '{path}' exists. Use overwrite=True to replace." 791 | 792 | path.parent.mkdir(parents=True, exist_ok=True) 793 | 794 | with open(path, 'w', encoding='utf-8') as f: 795 | f.write(content) 796 | f.flush() 797 | os.fsync(f.fileno()) 798 | 799 | file_size_kb = path.stat().st_size / 1024 800 | return f"Successfully wrote to {path}. Size: {file_size_kb:.2f} KB" 801 | 802 | except Exception as e: 803 | return f"Error writing file: {str(e)}" 804 | 805 | 806 | @mcp.tool() 807 | def list_directory(directory_path: str = "") -> str: 808 | """ 809 | List all Python files in a directory. 810 | 811 | Args: 812 | directory_path: Path to directory (empty for working directory) 813 | """ 814 | try: 815 | if not directory_path: 816 | path = WORKING_DIR 817 | else: 818 | path = Path(directory_path) 819 | if path.is_absolute(): 820 | if not is_path_allowed(path): 821 | return f"Access denied: Can only list files in working directory: {WORKING_DIR}" 822 | else: 823 | path = WORKING_DIR / directory_path 824 | 825 | if not path.exists(): 826 | return f"Error: Directory '{directory_path}' not found" 827 | 828 | if not path.is_dir(): 829 | return f"Error: '{directory_path}' is not a directory" 830 | 831 | files = find_python_files(path) 832 | 833 | if not files: 834 | return f"No Python files found in {directory_path or 'working directory'}" 835 | 836 | result = f"Python files in: {directory_path or str(WORKING_DIR)}\n\n" 837 | 838 | files_by_dir = {} 839 | base_dir = path if ALLOW_SYSTEM_ACCESS else WORKING_DIR 840 | 841 | for file in files: 842 | file_path = Path(file["path"]) 843 | try: 844 | relative_path = file_path.relative_to(base_dir) 845 | parent = str(relative_path.parent) 846 | if parent == ".": 847 | parent = "(root)" 848 | except ValueError: 849 | parent = str(file_path.parent) 850 | 851 | if parent not in files_by_dir: 852 | files_by_dir[parent] = [] 853 | 854 | files_by_dir[parent].append({ 855 | "name": file["name"], 856 | "size": file["size"] 857 | }) 858 | 859 | for dir_name, dir_files in sorted(files_by_dir.items()): 860 | result += f"📁 {dir_name}:\n" 861 | for file in sorted(dir_files, key=lambda x: x["name"]): 862 | size_kb = round(file["size"] / 1024, 1) 863 | result += f" 📄 {file['name']} ({size_kb} KB)\n" 864 | result += "\n" 865 | 866 | return result 867 | except Exception as e: 868 | return f"Error listing directory: {str(e)}" 869 | 870 | 871 | @mcp.tool() 872 | def clear_session(session_id: str = "default") -> str: 873 | """ 874 | Clear a REPL session's state and history. 875 | 876 | Args: 877 | session_id: Session ID to clear 878 | """ 879 | if session_id in _sessions: 880 | del _sessions[session_id] 881 | return f"Session '{session_id}' cleared." 882 | else: 883 | return f"Session '{session_id}' not found." 884 | 885 | 886 | @mcp.tool() 887 | def list_sessions() -> str: 888 | """List all active REPL sessions.""" 889 | if not _sessions: 890 | return "No active sessions." 891 | 892 | result = "Active REPL Sessions:\n\n" 893 | for session_id, session in _sessions.items(): 894 | result += f"- Session: {session_id}\n" 895 | result += f" History entries: {len(session.history)}\n" 896 | result += f" Variables: {len([k for k in session.locals.keys() if not k.startswith('__')])}\n\n" 897 | 898 | return result 899 | 900 | 901 | # ============================================================================ 902 | # Prompts 903 | # ============================================================================ 904 | 905 | @mcp.prompt() 906 | def python_function_template(description: str) -> str: 907 | """Generate a template for a Python function with docstring.""" 908 | return f"""Please create a Python function based on this description: 909 | 910 | {description} 911 | 912 | Include: 913 | - Type hints 914 | - Docstring with parameters, return value, and examples 915 | - Error handling where appropriate 916 | - Comments for complex logic""" 917 | 918 | 919 | @mcp.prompt() 920 | def refactor_python_code(code: str) -> str: 921 | """Help refactor Python code for better readability and performance.""" 922 | return f"""Please refactor this Python code to improve readability, performance, error handling, and structure: 923 | 924 | ```python 925 | {code} 926 | ``` 927 | 928 | Explain the changes you made and why they improve the code.""" 929 | 930 | 931 | @mcp.prompt() 932 | def debug_python_error(code: str, error_message: str) -> str: 933 | """Help debug a Python error.""" 934 | return f"""I'm getting this error: 935 | 936 | ```python 937 | {code} 938 | ``` 939 | 940 | Error message: 941 | ``` 942 | {error_message} 943 | ``` 944 | 945 | Please help by: 946 | 1. Explaining what the error means 947 | 2. Identifying the cause 948 | 3. Suggesting fixes""" 949 | 950 | 951 | # Run the server 952 | if __name__ == "__main__": 953 | mcp.run(transport='stdio') ```