# Directory Structure
```
├── .envrc
├── .gitignore
├── .vscode
│   └── settings.json
├── CHANGELOG.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── claude_desktop_integration.md
│   ├── image.jpg
│   ├── iterm2-enable-python-api.png
│   ├── quickstart.md
│   └── uv_guide.md
├── filter
│   ├── blacklist.txt
│   └── whitelist.txt
├── LICENSE
├── Makefile
├── mcp_terminal.py
├── prompt.md
├── pyproject.toml
├── README.en.md
├── README.md
├── shell.nix
├── src
│   ├── __init__.py
│   └── mcp_terminal
│       ├── __init__.py
│       ├── controllers
│       │   ├── __init__.py
│       │   ├── applescript.py
│       │   ├── base.py
│       │   ├── iterm.py
│       │   └── subprocess.py
│       ├── security
│       │   ├── __init__.py
│       │   └── command_filter.py
│       ├── server.py
│       └── tools
│           ├── __init__.py
│           ├── file.py
│           └── terminal.py
├── tests
│   ├── __init__.py
│   ├── test_command_filter.py
│   ├── test_subprocess_controller.py
│   └── test_terminal_security.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.envrc:
--------------------------------------------------------------------------------
```
export UV_LINK_MODE=copy
PATH_add ".venv/bin"
export VIRTUAL_ENV=$(pwd)/.venv
export PYTHONDONTWRITEBYTECODE=1
export TZ=Asia/Shanghai
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
```
--------------------------------------------------------------------------------
/README.en.md:
--------------------------------------------------------------------------------
```markdown
<div align="center">
# MCP Terminal
MCP Terminal is a terminal control server based on MCP (Model Context Protocol), designed specifically for integration with Large Language Models (LLMs) and AI assistants. It provides a standardized interface that enables AI to execute terminal commands and retrieve output results.
<a href="https://glama.ai/mcp/servers/@sichang824/mcp-terminal">
  <img width="380" height="200" src="https://glama.ai/mcp/servers/@sichang824/mcp-terminal/badge" alt="Terminal MCP server" />
</a>
English | [中文](README.md)
</div>
## Video
[](https://player.bilibili.com/player.html?isOutside=true&aid=114488023981820&bvid=BV1agEuzGE1X&cid=29904276803&p=1)
## Features
- Implemented using the official MCP SDK
- Supports multiple terminal controllers:
  - **iTerm2 Controller**: Provides advanced control on macOS using iTerm2's Python API
  - **AppleScript Controller**: Controls Terminal.app on macOS using AppleScript
  - **Subprocess Controller**: Universal terminal control method for all platforms
- Supports multiple server modes:
  - **STDIO Mode**: Communicates with clients via standard input/output
  - **SSE Mode**: Provides HTTP API via Server-Sent Events
- Offers multiple tools:
  - **Terminal Tool**: Executes commands and retrieves output
  - **File Tool**: Performs file operations (read, write, append, insert)
- Automatically detects the best terminal controller
- Seamless integration with Claude Desktop
- Docker deployment support
## Installation
### Prerequisites
- Python 3.8+
- [uv](https://github.com/astral-sh/uv) package management tool
If you haven't installed uv yet, you can install it with the following commands:
```bash
# On macOS using Homebrew
brew install uv
# On other platforms
pip install uv
```
### Installation with uv (Recommended)
Clone the repository and install dependencies using uv:
```bash
# Clone the repository
git clone https://github.com/yourusername/mcp-terminal.git
cd mcp-terminal
# Create a virtual environment and install basic dependencies
uv venv
source .venv/bin/activate  # On Windows use .venv\Scripts\activate
uv pip install -e .
# If you need iTerm2 support (macOS only)
uv pip install -e ".[iterm]"
# If you need development tools (testing, code formatting, etc.)
uv pip install -e ".[dev]"
```
> **Note: To use the iTerm2 controller, you must enable the Python API in iTerm2 settings.**
>
> Open iTerm2, go to `Preferences` → `General` → `Magic`, and check **Enable Python API** as shown below:
>
> 
### Installation using Makefile
We provide a Makefile to simplify common operations:
```bash
# Install basic dependencies
make setup
# Install iTerm2 support
make setup-iterm
# Install development dependencies
make setup-dev
```
### Installation using Docker
We provide Docker support for quick deployment of the MCP Terminal server:
```bash
# Build the Docker image
docker build -t mcp-terminal .
# Run the Docker container (SSE mode, port 8000)
docker run -p 8000:8000 mcp-terminal
```
Or use docker-compose:
```bash
# Start the service
docker-compose up -d
# View logs
docker-compose logs -f
# Stop the service
docker-compose down
```
## Usage
### Running the MCP Terminal Server
There are multiple ways to start the server:
```bash
# Run directly with Python (defaults to stdio mode and auto-detected terminal controller)
python mcp_terminal.py
# Run with Makefile (stdio mode)
make run-stdio
# Run with Makefile (SSE mode)
make run-sse
# Use a specific controller
make run-iterm     # Use iTerm2 controller
make run-applescript  # Use AppleScript controller
make run-subprocess   # Use Subprocess controller
```
### Running with Docker
Run the MCP Terminal server using Docker (defaults to SSE mode and Subprocess controller):
```bash
# Run directly
docker run -p 8000:8000 mcp-terminal
# Use a custom port
docker run -p 9000:8000 mcp-terminal
# Mount current directory (for local file access)
docker run -p 8000:8000 -v $(pwd):/workspace mcp-terminal
```
Default configuration:
- Server mode: SSE
- Host: 0.0.0.0 (allows remote connections)
- Port: 8000
- Controller: subprocess (suitable for containerized environments)
You can customize the configuration by modifying the Dockerfile or docker-compose.yml file.
### Using Docker Containers as MCP Services with Claude or Other AI Tools
You can configure Docker containers as MCP services, allowing Claude or other MCP-compatible AI tools to use containerized tools directly. Here's an example of using Docker containers as MCP services in a Claude configuration file:
```json
{
  "mcp": {
    "servers": {
      "terminal": {
        "command": "docker",
        "args": [
          "run",
          "--rm",
          "-i",
          "--mount",
          "type=bind,src=${workspaceFolder},dst=/workspace",
          "mcp-terminal",
          "mcp-terminal",
          "--mode",
          "sse",
          "--host",
          "0.0.0.0",
          "--port",
          "8000"
        ]
      }
    }
  }
}
```
This configuration offers several benefits:
- Isolates tool execution environments using Docker containers
- Eliminates the need to install specific tools locally
- Maintains consistent tool versions and configurations across different environments
- Uses the `${workspaceFolder}` variable to mount the current working directory into the container
- Ensures containers are automatically removed after use with the `--rm` flag, keeping the environment clean
You can define multiple different MCP service containers as needed, each focused on specific functionality.
### Claude Desktop Integration Configuration Example
Here's an example configuration for Claude Desktop:
```json
{
  "mcpServers": {
    "terminal": {
      "command": "/Users/ann/Workspace/mcp-terminal/.venv/bin/python",
      "args": [
        "/Users/ann/Workspace/mcp-terminal/mcp_terminal.py",
        "--controller",
        "subprocess"
      ]
    }
  }
}
```
### Command Line Options
The server supports various command line options:
```bash
python mcp_terminal.py --help
```
Main options:
- `--controller` or `-c`: Specify terminal controller type (auto, iterm, applescript, subprocess)
- `--mode` or `-m`: Specify server mode (stdio, sse)
- `--host`: Specify host address for SSE mode
- `--port` or `-p`: Specify port for SSE mode
- `--log-level` or `-l`: Specify logging level
## Integration with Claude Desktop
MCP Terminal can be seamlessly integrated with Claude Desktop to provide terminal control capabilities to Claude.
### Configuration Steps
1. **Start the MCP Terminal Server** (in stdio mode):
   ```bash
   # Run in a terminal window
   make run-stdio
   ```
2. **Configure Claude Desktop to use the MCP Server**:
   Open Claude Desktop, then:
   - Click the settings icon (usually in the top right corner)
   - Navigate to the "Extensions" or "Tools" tab
   - Enable the "Custom Tools" feature
   - Add the MCP Terminal configuration:
     - Tool name: Terminal
     - Tool path: Enter the full path to mcp_terminal.py
     - Use stdio mode: Check this option
     - Save the configuration
3. **Test the Integration**:
   In your conversation with Claude, you can now ask Claude to execute terminal commands, such as:
   - "Please list the files in my home directory"
   - "Check my current Python version"
   - "Create a new directory and write the current date to a file"
### Troubleshooting
If the integration is not working properly:
1. Make sure the MCP Terminal server is running
2. Check the log output for errors
3. Verify that the Claude Desktop tool configuration is correct
4. Try restarting both Claude Desktop and the MCP Terminal server
## API Specification
MCP Terminal provides the following MCP functions:
### execute_command
Executes a terminal command and retrieves the output.
**Parameters**:
- `command` (string): The command to execute
- `wait_for_output` (boolean, optional): Whether to wait for and return command output, defaults to true
- `timeout` (integer, optional): Timeout in seconds for waiting for output, defaults to 10
**Returns**:
- `success` (boolean): Whether the command executed successfully
- `output` (string, optional): The command output
- `error` (string, optional): Error message if the command failed
- `return_code` (integer, optional): The command return code
- `warning` (string, optional): Warning message
### get_terminal_info
Gets terminal information.
**Parameters**: None
**Returns**:
- `terminal_type` (string): The type of terminal being used
- `platform` (string): The running platform
### file_modify
Writes, appends, or inserts content to a file.
**Parameters**:
- `filepath` (string): File path
- `content` (string): Content to write
- `mode` (string, optional): Writing mode, options are "overwrite", "append", or "insert", defaults to "overwrite"
- `position` (integer, optional): Insertion position when using "insert" mode
- `create_dirs` (boolean, optional): Whether to create directories if they don't exist, defaults to true
**Returns**:
- `success` (boolean): Whether the operation was successful
- `error` (string, optional): Error message if the operation failed
- `filepath` (string): Path to the file that was operated on
- `details` (object, optional): Additional operation details
## Security Considerations
MCP Terminal allows execution of arbitrary terminal commands, which may pose security risks. When using it in a production environment, you should:
1. Limit the server to accepting connections only from trusted sources
2. Consider implementing command whitelists or blacklists
3. Regularly audit executed commands
4. Run the server under a dedicated account with limited permissions
## Development
### Directory Structure
```
mcp-terminal/
├── mcp_terminal.py            # Entry point script
├── pyproject.toml             # Project configuration and dependencies
├── README.md                  # Project documentation
├── Makefile                   # Build and run commands
├── Dockerfile                 # Docker build configuration
├── docker-compose.yml         # Docker Compose configuration
├── src/
│   ├── __init__.py
│   └── mcp_terminal/
│       ├── __init__.py
│       ├── server.py          # Main server implementation
│       ├── controllers/
│       │   ├── __init__.py    # Controller factory and imports
│       │   ├── base.py        # Base controller interface
│       │   ├── subprocess.py  # Universal subprocess controller
│       │   ├── applescript.py # AppleScript controller
│       │   └── iterm.py       # iTerm2 API controller
│       └── tools/
│           ├── __init__.py
│           ├── terminal.py    # Terminal operation tool
│           └── file.py        # File operation tool
└── tests/                     # Test directory
    ├── __init__.py
    └── test_subprocess_controller.py
```
### Running Tests
```bash
# Run all tests using pytest
make test
# Or directly using pytest
pytest tests/
```
### Code Formatting
```bash
# Check code formatting
make lint
# Automatically format code
make format
```
## Contributing
Contributions are welcome! Please follow these steps:
1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Submit a Pull Request
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Security Features
### Command Whitelisting and Blacklisting
MCP Terminal provides security features to control which commands can be executed:
- **Whitelist Mode**: Only allows execution of commands that are explicitly listed in a whitelist file.
- **Blacklist Mode**: Blocks execution of commands that are listed in a blacklist file.
#### Configuration
You can configure command filtering using the following command-line options:
```bash
# Run with a blacklist (block listed commands)
python mcp_terminal.py --blacklist-file ./examples/blacklist.txt
# Run in whitelist mode (only allow listed commands)
python mcp_terminal.py --whitelist-file ./examples/whitelist.txt --whitelist-mode
# Use both whitelist and blacklist together
python mcp_terminal.py --whitelist-file ./examples/whitelist.txt --blacklist-file ./examples/blacklist.txt --whitelist-mode
```
#### Whitelist/Blacklist File Format
The whitelist and blacklist files use a simple format:
- One command per line
- Lines starting with `#` are treated as comments
- Empty lines are ignored
- Regular entries match the beginning of commands
- Entries starting with `^` are treated as regular expressions
Example whitelist file:
```
# Allowed commands
ls
pwd
cd
cat
# This is a regex pattern that matches any 'git' command
^git.*
```
Example blacklist file:
```
# Dangerous commands to block
sudo
rm -rf
# Block any command with 'eval'
^.*eval.*
```
```
--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------
```python
"""MCP Terminal package."""
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
"""Test package for MCP Terminal."""
```
--------------------------------------------------------------------------------
/src/mcp_terminal/security/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Provides security features such as command whitelisting and blacklisting.
"""
```
--------------------------------------------------------------------------------
/src/mcp_terminal/__init__.py:
--------------------------------------------------------------------------------
```python
"""
MCP Terminal - A terminal control server using Model Context Protocol (MCP).
"""
__version__ = "0.1.0"
```
--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------
```json
{
    "python.testing.pytestArgs": [
        "tests"
    ],
    "python.testing.unittestEnabled": false,
    "python.testing.pytestEnabled": true
}
```
--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------
```yaml
services:
  mcp-terminal:
    build: .
    container_name: mcp-terminal
    volumes:
      - .:/workspace
    ports:
      - "8000:8000"
    environment:
      - PYTHONUNBUFFERED=1
```
--------------------------------------------------------------------------------
/src/mcp_terminal/tools/__init__.py:
--------------------------------------------------------------------------------
```python
"""MCP Terminal tools package."""
from .file import FileOperationResponse, FileTool, WriteMode
from .terminal import TerminalTool
__all__ = ["FileTool", "WriteMode", "FileOperationResponse", "TerminalTool"]
```
--------------------------------------------------------------------------------
/mcp_terminal.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Entry point script for MCP Terminal server.
"""
import os
import sys
# Add the src directory to the Python path
src_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src")
sys.path.insert(0, src_path)
from mcp_terminal.server import main
if __name__ == "__main__":
    sys.exit(main())
```
--------------------------------------------------------------------------------
/filter/whitelist.txt:
--------------------------------------------------------------------------------
```
# MCP Terminal Whitelist
# This file contains allowed commands when running in whitelist mode
# Lines starting with # are comments and are ignored
# Empty lines are also ignored
# Regular commands match the beginning of the executed command
# Regex patterns start with ^ and match the entire command
# Basic file operations
ls
pwd
cd
cat
less
head
tail
grep
find
mkdir
rm
cp
mv
# Git commands
git
# Editor commands
nano
vim
vi
emacs
# File viewing and manipulation
touch
wc
diff
sed
awk
# Process management (safe ones)
ps
top
htop
# Network utilities (read-only)
ping
dig
nslookup
netstat
ifconfig
ip
# System information
uname
whoami
id
date
uptime
df
du 
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PATH="/root/.local/bin:$PATH"
# Set working directory
WORKDIR /app
# Copy uv
COPY --from=ghcr.io/astral-sh/uv:0.4.15 /uv /bin/uv
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY pyproject.toml uv.lock* ./
RUN --mount=type=cache,target=/root/.cache/uv \
    uv sync --frozen --no-install-project
# Copy the rest of the code
COPY . .
RUN uv tool install . 
# Default command
CMD ["mcp-terminal", "--mode", "sse", "--host", "0.0.0.0", "--port", "8000"]
```
--------------------------------------------------------------------------------
/filter/blacklist.txt:
--------------------------------------------------------------------------------
```
# MCP Terminal Blacklist
# This file contains blocked commands
# Lines starting with # are comments and are ignored
# Empty lines are also ignored
# Regular commands match the beginning of the executed command
# Regex patterns start with ^ and match the entire command
# System modifying commands
sudo
su
apt
apt-get
yum
dnf
brew
npm
pip
gem
# Potentially harmful commands
rm -rf
^rm\s+-rf.*
chmod
chown
dd
mkfs
fdisk
mount
umount
# Network modification
ifconfig
route
iptables
ufw
fw
# Service management
systemctl
service
init
shutdown
reboot
halt
poweroff
init
# Shell escapes/command execution
eval
exec
source
bash -c
sh -c
python -c
perl -e
ruby -e
php -r
# File transfer utilities
scp
sftp
ftp
rsync
wget
curl 
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mcp-terminal"
version = "0.1.0"
description = "MCP Terminal Server for controlling terminal operations"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [
    { name = "MCP Terminal Developer" }
]
dependencies = [
    "iterm2>=2.9",
    "mcp[cli]>=1.7.1",
    "pydantic==2.10.4",
]
[project.optional-dependencies]
iterm = ["iterm2>=1.0.0"]
dev = [
    "pytest>=7.0.0",
    "black>=23.0.0",
    "isort>=5.12.0",
]
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_terminal"]
[tool.black]
line-length = 88
target-version = ["py310"]
[tool.isort]
profile = "black"
line_length = 88
[project.scripts]
mcp-terminal = "mcp_terminal.server:main"
[tool.uv]
package = true
[dependency-groups]
dev = [
    "pytest>=8.3.5",
]
```
--------------------------------------------------------------------------------
/src/mcp_terminal/controllers/base.py:
--------------------------------------------------------------------------------
```python
"""
Base terminal controller interface.
All terminal controllers should implement this interface.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class BaseTerminalController(ABC):
    """Base interface for terminal controllers."""
    @abstractmethod
    async def execute_command(
        self, command: str, wait_for_output: bool = True, timeout: int = 10
    ) -> Dict[str, Any]:
        """
        Execute a command in the terminal.
        Args:
            command: The command to execute
            wait_for_output: Whether to wait for output
            timeout: Timeout in seconds
        Returns:
            A dictionary with the result of the command execution
        """
        pass
    @abstractmethod
    async def get_terminal_type(self) -> str:
        """
        Get the terminal type.
        Returns:
            The terminal type
        """
        pass
    @abstractmethod
    async def cleanup(self) -> None:
        """
        Clean up resources.
        """
        pass
```
--------------------------------------------------------------------------------
/prompt.md:
--------------------------------------------------------------------------------
```markdown
# Basic Command Line Coding Assistant
You are a command line coding assistant. Help me write and manage code using these essential terminal commands:
## Basic File Operations
- View files recursively: `tree -fiI ".venv|node_modules|.git|dist|<MORE_IGNORE_PATTERNS>"`
- View file contents: `cat file.py`
- Search in files: `grep "function" file.py`
- Search recursively: `grep -r "pattern" directory/`
- Find files by name: `find . -name "*.py"`
- Modify file using: `file_modify`
- Move/rename files: `mv oldname.py newname.py`
## Assistant Behavior
- Directly modify files without outputting code blocks
- Read/Write all of docs in the project directory ./docs
- Ensure code is not redundant or duplicative
- Prioritize implementation logic and ask user when facing decisions
- Maintain existing code style and naming conventions when modifying files
- Use concise commands to execute operations efficiently
- Consider performance implications when suggesting solutions
- Provide clear explanation of steps taken during complex operations
- Verify commands before execution, especially for destructive operations
- Suggest file organization improvements when appropriate
- Always write code in English, including all code, comments, and strings
- After fully understanding responsibilities, respond with "Ready to start coding now"
## Project Preferences
- Python: Use uv sync for dependency management
  - Create venv: `uv venv`
  - Install packages: `uv sync`
  - Add dependencies: `uv add <package>`
- Default Project Files:
  - Create Makefile
  - Create .envrc:
- Project dir: /Users/ann/Workspace/mcp-terminal
- Project language: Python Docker compose
```
--------------------------------------------------------------------------------
/tests/test_subprocess_controller.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the subprocess terminal controller.
"""
import asyncio
import os
import sys
import unittest
from unittest import IsolatedAsyncioTestCase
# Add both src and project root to Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
src_path = os.path.join(project_root, "src")
sys.path.insert(0, project_root)
sys.path.insert(0, src_path)
from mcp_terminal.controllers.subprocess import SubprocessTerminalController
class TestSubprocessTerminalController(IsolatedAsyncioTestCase):
    """Test cases for the subprocess terminal controller."""
    async def asyncSetUp(self):
        """Set up the test case."""
        self.controller = SubprocessTerminalController()
    async def test_execute_command(self):
        """Test executing a simple command."""
        result = await self.controller.execute_command("echo 'Hello, World!'")
        self.assertTrue(result["success"])
        self.assertIn("Hello, World!", result["output"])
    async def test_execute_command_with_error(self):
        """Test executing a command that produces an error."""
        result = await self.controller.execute_command("ls /nonexistent")
        self.assertFalse(result["success"])
        self.assertTrue(result["error"])
    async def test_get_terminal_type(self):
        """Test getting the terminal type."""
        terminal_type = await self.controller.get_terminal_type()
        self.assertEqual(terminal_type, "subprocess")
    async def test_cleanup(self):
        """Test cleaning up resources."""
        await self.controller.cleanup()
        # No assertions needed as cleanup does nothing for subprocess controller
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/src/mcp_terminal/controllers/__init__.py:
--------------------------------------------------------------------------------
```python
"""Terminal controllers package."""
# Conditionally import platform-specific controllers
import platform
from .base import BaseTerminalController
from .subprocess import SubprocessTerminalController
if platform.system() == "Darwin":  # macOS
    from .applescript import AppleScriptTerminalController
    # Try to import iTerm2 controller if the package is available
    try:
        import iterm2
        from .iterm import ITermController
        ITERM_AVAILABLE = True
    except ImportError:
        ITERM_AVAILABLE = False
else:
    ITERM_AVAILABLE = False
def get_controller(controller_type=None):
    """
    Factory function to get a terminal controller based on the specified type or platform.
    Args:
        controller_type: The type of controller to get ("iterm", "applescript", "subprocess")
                        or None to auto-detect
    Returns:
        A terminal controller instance
    """
    system = platform.system()
    # If controller type is specified, try to use it
    if controller_type:
        if controller_type == "iterm" and system == "Darwin":
            if ITERM_AVAILABLE:
                return ITermController()
            else:
                raise ImportError(
                    "iTerm2 API not available. Install with 'pip install iterm2'"
                )
        elif controller_type == "applescript" and system == "Darwin":
            return AppleScriptTerminalController()
        elif controller_type == "subprocess":
            return SubprocessTerminalController()
        else:
            raise ValueError(
                f"Controller type '{controller_type}' not supported on {system}"
            )
    # Auto-detect the best controller
    if system == "Darwin":
        if ITERM_AVAILABLE:
            try:
                # Check if iTerm2 is installed
                import subprocess
                result = subprocess.run(
                    [
                        "osascript",
                        "-e",
                        'tell application "System Events" to exists application process "iTerm2"',
                    ],
                    capture_output=True,
                    text=True,
                )
                if "true" in result.stdout.lower():
                    return ITermController()
            except Exception:
                pass
        # Fall back to AppleScript for macOS Terminal
        return AppleScriptTerminalController()
    # Default to subprocess controller for all other platforms
    return SubprocessTerminalController()
```
--------------------------------------------------------------------------------
/src/mcp_terminal/controllers/subprocess.py:
--------------------------------------------------------------------------------
```python
"""
Subprocess terminal controller.
Uses Python's subprocess module to execute commands.
Works on all platforms.
"""
import asyncio
from typing import Any, Dict
from mcp_terminal.controllers.base import BaseTerminalController
class SubprocessTerminalController(BaseTerminalController):
    """Terminal controller using subprocess."""
    async def execute_command(
        self, command: str, wait_for_output: bool = True, timeout: int = 10
    ) -> Dict[str, Any]:
        """
        Execute a command using subprocess.
        Args:
            command: The command to execute
            wait_for_output: Whether to wait for output
            timeout: Timeout in seconds
        Returns:
            A dictionary with the result of the command execution
        """
        try:
            # Create subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            if wait_for_output:
                try:
                    # Wait for the process to complete with timeout
                    stdout, stderr = await asyncio.wait_for(
                        process.communicate(), timeout=timeout
                    )
                    return {
                        "success": process.returncode == 0,
                        "output": stdout.decode("utf-8", errors="replace"),
                        "error": stderr.decode("utf-8", errors="replace"),
                        "return_code": process.returncode,
                    }
                except asyncio.TimeoutError:
                    # Kill the process if it times out
                    try:
                        process.kill()
                    except ProcessLookupError:
                        pass
                    return {
                        "success": False,
                        "error": f"Command timed out after {timeout} seconds",
                    }
            else:
                # Don't wait for output
                return {
                    "success": True,
                    "output": "Command sent (output not captured)",
                }
        except Exception as e:
            return {
                "success": False,
                "error": f"Error executing command: {str(e)}",
            }
    async def get_terminal_type(self) -> str:
        """
        Get the terminal type.
        Returns:
            The terminal type
        """
        return "subprocess"
    async def cleanup(self) -> None:
        """
        Clean up resources.
        """
        pass  # No resources to clean up
```
--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------
```markdown
# Changelog
## Change Types
- **Features**: New features or improvements
- **Bug Fixes**: Bug fixes and patches
- **Breaking Changes**: Incompatible changes
## Commit Guidelines
All notable changes to this project will be documented in this file. See [Conventional Commits](https://www.conventionalcommits.org/) specification when making commits.
---
### [0.1.2](https://github.com/sichang824/mcp-terminal/compare/0.1.1...0.1.2) (2025-05-16)
### Commits
* chore: update dependencies and testing configuration ([9faafe7](https://github.com/sichang824/mcp-terminal/commit/9faafe7bbdaf557a4cc1106739170f6aadefdf73))
* Merge pull request #5 from sichang824/add-security-tests ([9678d3f](https://github.com/sichang824/mcp-terminal/commit/9678d3f7e7688f5a5d14f0fab6a0a27b8f4c8084))
* Merge pull request #3 from npkanaka/feature/security-changes ([00a70e8](https://github.com/sichang824/mcp-terminal/commit/00a70e82ab9b83de8da461cbb08193323dc96073))
* test: Add automated tests for command filtering security features ([377d81c](https://github.com/sichang824/mcp-terminal/commit/377d81cd443799debbecae54869c8d5f34ce4bed))
* feat: add Nix shell configuration and minor whitespace adjustment ([e9cc1eb](https://github.com/sichang824/mcp-terminal/commit/e9cc1eb1817f9af855b0b7b0beecdf52b1d3c2e7))
* Implement security enhancements for MCP Terminal ([6b1cf0c](https://github.com/sichang824/mcp-terminal/commit/6b1cf0c6d0d69e201d6ca70c7ba39763279fce6f))
* Enhance README and add terminal info retrieval features ([8f19399](https://github.com/sichang824/mcp-terminal/commit/8f193997e70549ec5106a9453f2e338e5ecb548f))
* Merge pull request #2 from punkpeye/glama-badge ([c962c17](https://github.com/sichang824/mcp-terminal/commit/c962c17682eb2e3db3a16b730c167c75eab660b7))
* add MCP server badge ([8dc6285](https://github.com/sichang824/mcp-terminal/commit/8dc6285001a8f62395b01970058eac727b85c10e))
* Update README files to include alignment and language links ([068437a](https://github.com/sichang824/mcp-terminal/commit/068437a995e94272620707bf29fb53e8ce753e46))
* Add Docker container configuration for MCP services in README ([6a1d76c](https://github.com/sichang824/mcp-terminal/commit/6a1d76c8e8bea7cf8092f01e8cbb19c5c96d52ce))
* Update README to include Docker deployment instructions and configuration details ([c68c013](https://github.com/sichang824/mcp-terminal/commit/c68c0130ed565cfda3fcc5357b1b7ac4329c5182))
* Add Docker support with Dockerfile and docker-compose.yml ([d548b30](https://github.com/sichang824/mcp-terminal/commit/d548b30420c566eeac7d2609845d2a5a03d20af9))
* Add English README for MCP Terminal with detailed features, installation instructions, usage examples, and API specifications ([21889d6](https://github.com/sichang824/mcp-terminal/commit/21889d6b59fba4c937e31f184ce721d548a869f4))
```
--------------------------------------------------------------------------------
/src/mcp_terminal/tools/file.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
File tool for MCP.
Provides file operations through the MCP interface.
"""
import logging
import os
from enum import Enum
from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("MCP:Terminal:FileTool")
class WriteMode(str, Enum):
    """Enum representing different file writing modes."""
    OVERWRITE = "overwrite"  # Overwrite the entire file
    APPEND = "append"  # Append to the end of the file
    INSERT = "insert"  # Insert at a specific position
class FileOperationResponse(BaseModel):
    """Response model for file operations."""
    success: bool = Field(..., description="Whether the operation was successful")
    error: Optional[str] = Field(
        None, description="Error message if the operation failed"
    )
    filepath: str = Field(..., description="Path to the file that was operated on")
    details: Optional[Dict[str, Any]] = Field(
        None, description="Additional operation details"
    )
class FileTool:
    """
    MCP tool for file operations.
    This tool provides functions to perform various file operations
    such as reading, writing, and manipulating files.
    """
    name = "file"
    def register_mcp(self, mcp: FastMCP) -> None:
        """Register the file tool with the MCP server."""
        @mcp.tool(name="file_modify", description="Writes content to a file")
        async def file_modify(
            filepath: str,
            content: str,
            mode: WriteMode = WriteMode.OVERWRITE,
            position: Optional[int] = None,
            create_dirs: bool = True,
        ) -> FileOperationResponse:
            try:
                # Create directories if they don't exist
                if create_dirs:
                    directory = os.path.dirname(filepath)
                    if directory and not os.path.exists(directory):
                        os.makedirs(directory)
                # Handle different write modes
                details = {"mode": mode}
                if mode == WriteMode.INSERT:
                    if position is None:
                        raise ValueError(
                            "Position must be specified when using INSERT mode"
                        )
                    # Read existing content if file exists
                    existing_content = ""
                    if os.path.exists(filepath):
                        with open(filepath, "r", encoding="utf-8") as f:
                            existing_content = f.read()
                    # Insert the new content at the specified position
                    position = min(position, len(existing_content))
                    new_content = (
                        existing_content[:position]
                        + content
                        + existing_content[position:]
                    )
                    details["position"] = position
                    # Write the combined content
                    with open(filepath, "w", encoding="utf-8") as f:
                        f.write(new_content)
                elif mode == WriteMode.APPEND:
                    # Append to file
                    with open(filepath, "a", encoding="utf-8") as f:
                        f.write(content)
                else:  # OVERWRITE
                    # Overwrite file
                    with open(filepath, "w", encoding="utf-8") as f:
                        f.write(content)
                return FileOperationResponse(
                    success=True, filepath=filepath, details=details
                )
            except Exception as e:
                logger.error(f"Error writing to file {filepath}: {e}")
                return FileOperationResponse(
                    success=False,
                    error=f"Error writing to file: {str(e)}",
                    filepath=filepath,
                )
```
--------------------------------------------------------------------------------
/src/mcp_terminal/security/command_filter.py:
--------------------------------------------------------------------------------
```python
"""
Command filtering module for MCP Terminal.
Provides functionality to whitelist and blacklist commands.
"""
import logging
import os
import re
from typing import List, Optional, Set, Tuple
# Configure logging
logger = logging.getLogger("MCP:Terminal:Security")
class CommandFilter:
    """
    Filter for terminal commands based on whitelist and blacklist.
    """
    def __init__(
        self,
        whitelist_file: Optional[str] = None,
        blacklist_file: Optional[str] = None,
        whitelist_mode: bool = False,
    ):
        """
        Initialize command filter.
        Args:
            whitelist_file: Path to whitelist file
            blacklist_file: Path to blacklist file
            whitelist_mode: If True, only whitelisted commands are allowed.
                           If False, all commands except blacklisted ones are allowed.
        """
        self.whitelist_file = whitelist_file
        self.blacklist_file = blacklist_file
        self.whitelist_mode = whitelist_mode
        self.whitelist: Set[str] = set()
        self.blacklist: Set[str] = set()
        # Load lists if files are provided
        if whitelist_file:
            self._load_list(whitelist_file, self.whitelist)
        if blacklist_file:
            self._load_list(blacklist_file, self.blacklist)
    def _load_list(self, file_path: str, command_set: Set[str]) -> None:
        """
        Load commands from a file into a set.
        Args:
            file_path: Path to the command list file
            command_set: Set to load the commands into
        """
        if not file_path or not os.path.exists(file_path):
            logger.warning(f"Command list file not found: {file_path}")
            return
        try:
            with open(file_path, "r") as f:
                for line in f:
                    # Skip comments and empty lines
                    line = line.strip()
                    if line and not line.startswith("#"):
                        command_set.add(line)
            logger.info(f"Loaded {len(command_set)} commands from {file_path}")
        except Exception as e:
            logger.error(f"Error loading command list from {file_path}: {e}")
    def is_command_allowed(self, command: str) -> Tuple[bool, Optional[str]]:
        """
        Check if a command is allowed based on whitelist/blacklist rules.
        Args:
            command: The command to check
        Returns:
            Tuple of (is_allowed, reason_if_not_allowed)
        """
        # Extract the base command (usually the first word before any arguments)
        base_command = command.split()[0] if command else ""
        # In whitelist mode, command must be in the whitelist
        if self.whitelist_mode:
            if not self.whitelist:
                logger.warning("Whitelist mode enabled but whitelist is empty")
                return False, "Whitelist mode enabled but whitelist is empty"
            for allowed_cmd in self.whitelist:
                # Match whole command or pattern
                if base_command == allowed_cmd or self._match_pattern(
                    command, allowed_cmd
                ):
                    return True, None
            return False, f"Command not in whitelist: {base_command}"
        # In blacklist mode, command must not be in the blacklist
        else:
            for blocked_cmd in self.blacklist:
                # Match whole command or pattern
                if base_command == blocked_cmd or self._match_pattern(
                    command, blocked_cmd
                ):
                    return False, f"Command blacklisted: {base_command}"
            return True, None
    def _match_pattern(self, command: str, pattern: str) -> bool:
        """
        Check if a command matches a pattern.
        Patterns starting with ^ are treated as regular expressions.
        Args:
            command: The command to check
            pattern: The pattern to match against
        Returns:
            True if command matches the pattern
        """
        if pattern.startswith("^"):
            try:
                return bool(re.match(pattern, command))
            except re.error:
                logger.error(f"Invalid regex pattern: {pattern}")
                return False
        return False
```
--------------------------------------------------------------------------------
/src/mcp_terminal/controllers/applescript.py:
--------------------------------------------------------------------------------
```python
"""
AppleScript terminal controller.
Uses AppleScript to control the macOS Terminal app.
"""
import asyncio
import platform
import time
from typing import Any, Dict, List
from mcp_terminal.controllers.base import BaseTerminalController
class AppleScriptTerminalController(BaseTerminalController):
    """Terminal controller using AppleScript for macOS Terminal."""
    def __init__(self):
        """Initialize the AppleScript terminal controller."""
        if platform.system() != "Darwin":
            raise RuntimeError("AppleScript controller only works on macOS")
    async def execute_command(
        self, command: str, wait_for_output: bool = True, timeout: int = 10
    ) -> Dict[str, Any]:
        """
        Execute a command in macOS Terminal using AppleScript.
        Args:
            command: The command to execute
            wait_for_output: Whether to wait for output
            timeout: Timeout in seconds
        Returns:
            A dictionary with the result of the command execution
        """
        try:
            # Escape double quotes in the command for AppleScript
            escaped_command = command.replace('"', '\\"')
            # Create AppleScript to execute the command
            script = f"""
            tell application "Terminal"
                activate
                if not (exists window 1) then
                    do script ""
                else
                    do script "" in window 1
                end if
                do script "{escaped_command}" in window 1
            end tell
            """
            # Run the AppleScript
            proc = await asyncio.create_subprocess_exec(
                "osascript",
                "-e",
                script,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            # Wait for the AppleScript to complete
            await proc.wait()
            if wait_for_output:
                # Get output from the Terminal
                return await self._get_terminal_output(command, timeout)
            else:
                return {
                    "success": True,
                    "output": "Command sent (output not captured)",
                }
        except Exception as e:
            return {
                "success": False,
                "error": f"Error executing command: {str(e)}",
            }
    async def _get_terminal_output(self, command: str, timeout: int) -> Dict[str, Any]:
        """
        Get output from the Terminal app.
        Args:
            command: The command that was executed
            timeout: Timeout in seconds
        Returns:
            A dictionary with the output
        """
        # AppleScript to get Terminal contents
        output_script = """
        tell application "Terminal"
            contents of window 1
        end tell
        """
        start_time = time.time()
        output = ""
        # Poll for output until timeout
        while time.time() - start_time < timeout:
            # Get current terminal content
            proc = await asyncio.create_subprocess_exec(
                "osascript",
                "-e",
                output_script,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            stdout, stderr = await proc.communicate()
            current_output = stdout.decode("utf-8", errors="replace").strip()
            # Check if we have new output
            if current_output and len(current_output) > len(output):
                output = current_output
            # Check if command has completed (output contains command and has more content)
            if command in current_output and len(current_output) > len(command):
                # Try to extract just the output after the command
                try:
                    output_lines = output.split("\n")
                    cmd_index = self._find_command_index(output_lines, command)
                    if cmd_index >= 0 and cmd_index < len(output_lines) - 1:
                        result = "\n".join(output_lines[cmd_index + 1 :])
                        return {"success": True, "output": result}
                except Exception as e:
                    pass  # Fall back to returning all output
                return {"success": True, "output": output}
            # Wait before checking again
            await asyncio.sleep(0.5)
        # If we get here, we timed out
        return {
            "success": True,
            "output": output,
            "warning": f"Command may still be running after {timeout} seconds",
        }
    def _find_command_index(self, lines: List[str], command: str) -> int:
        """
        Find the index of the line containing the command.
        Args:
            lines: List of output lines
            command: Command to find
        Returns:
            Index of the line containing the command, or -1 if not found
        """
        for i, line in enumerate(lines):
            if command in line:
                return i
        return -1
    async def get_terminal_type(self) -> str:
        """
        Get the terminal type.
        Returns:
            The terminal type
        """
        return "macOS Terminal"
    async def cleanup(self) -> None:
        """
        Clean up resources.
        """
        pass  # No resources to clean up
```
--------------------------------------------------------------------------------
/tests/test_terminal_security.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the terminal tool security features.
"""
import os
import sys
import tempfile
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
# Add both src and project root to Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
src_path = os.path.join(project_root, "src")
sys.path.insert(0, project_root)
sys.path.insert(0, src_path)
from mcp_terminal.tools.terminal import TerminalTool
class TestTerminalToolSecurity(unittest.TestCase):
    """Test cases for the terminal tool security features."""
    def setUp(self):
        """Set up the test case."""
        # Create temporary files for whitelist and blacklist
        self.whitelist_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
        self.blacklist_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
        # Write test data to whitelist
        self.whitelist_file.write("# Allowed commands\n")
        self.whitelist_file.write("ls\n")
        self.whitelist_file.write("echo\n")
        self.whitelist_file.write("cat\n")
        self.whitelist_file.flush()
        # Write test data to blacklist
        self.blacklist_file.write("# Blocked commands\n")
        self.blacklist_file.write("rm\n")
        self.blacklist_file.write("sudo\n")
        self.blacklist_file.write("eval\n")
        self.blacklist_file.flush()
        # Create patches for controller
        self.controller_patcher = patch(
            "src.mcp_terminal.tools.terminal.get_controller"
        )
        self.mock_get_controller = self.controller_patcher.start()
        self.mock_controller = AsyncMock()
        self.mock_get_controller.return_value = self.mock_controller
        # Mock successful command execution
        self.mock_controller.execute_command.return_value = {
            "success": True,
            "output": "Command output",
            "error": None,
        }
    def tearDown(self):
        """Clean up test resources."""
        # Close and remove temporary files
        self.whitelist_file.close()
        self.blacklist_file.close()
        os.unlink(self.whitelist_file.name)
        os.unlink(self.blacklist_file.name)
        # Stop patches
        self.controller_patcher.stop()
    @patch("src.mcp_terminal.tools.terminal.FastMCP")
    async def test_allowed_command_whitelist_mode(self, mock_fastmcp):
        """Test executing allowed command in whitelist mode."""
        # Setup terminal tool with whitelist mode
        tool = TerminalTool(
            whitelist_file=self.whitelist_file.name, whitelist_mode=True
        )
        # Execute an allowed command
        response = await tool.execute_command(command="ls -la")
        # Check that controller was called
        self.mock_controller.execute_command.assert_called_once_with(
            "ls -la", timeout=10
        )
        self.assertTrue(response.success)
    @patch("src.mcp_terminal.tools.terminal.FastMCP")
    async def test_blocked_command_whitelist_mode(self, mock_fastmcp):
        """Test executing blocked command in whitelist mode."""
        # Setup terminal tool with whitelist mode
        tool = TerminalTool(
            whitelist_file=self.whitelist_file.name, whitelist_mode=True
        )
        # Execute a blocked command
        response = await tool.execute_command(command="rm -rf /")
        # Check that controller was NOT called
        self.mock_controller.execute_command.assert_not_called()
        self.assertFalse(response.success)
        self.assertIn("not allowed", response.error)
    @patch("src.mcp_terminal.tools.terminal.FastMCP")
    async def test_allowed_command_blacklist_mode(self, mock_fastmcp):
        """Test executing allowed command in blacklist mode."""
        # Setup terminal tool with blacklist mode
        tool = TerminalTool(
            blacklist_file=self.blacklist_file.name, whitelist_mode=False
        )
        # Execute an allowed command
        response = await tool.execute_command(command="grep 'test' file.txt")
        # Check that controller was called
        self.mock_controller.execute_command.assert_called_once_with(
            "grep 'test' file.txt", timeout=10
        )
        self.assertTrue(response.success)
    @patch("src.mcp_terminal.tools.terminal.FastMCP")
    async def test_blocked_command_blacklist_mode(self, mock_fastmcp):
        """Test executing blocked command in blacklist mode."""
        # Setup terminal tool with blacklist mode
        tool = TerminalTool(
            blacklist_file=self.blacklist_file.name, whitelist_mode=False
        )
        # Execute a blocked command
        response = await tool.execute_command(command="sudo apt-get update")
        # Check that controller was NOT called
        self.mock_controller.execute_command.assert_not_called()
        self.assertFalse(response.success)
        self.assertIn("not allowed", response.error)
    @patch("src.mcp_terminal.tools.terminal.FastMCP")
    async def test_both_whitelist_blacklist(self, mock_fastmcp):
        """Test using both whitelist and blacklist."""
        # Setup terminal tool with both whitelist and blacklist
        tool = TerminalTool(
            whitelist_file=self.whitelist_file.name,
            blacklist_file=self.blacklist_file.name,
            whitelist_mode=True,  # Whitelist mode takes precedence
        )
        # Test command in whitelist
        response = await tool.execute_command(command="echo Hello")
        self.assertTrue(response.success)
        # Test command in blacklist but also in whitelist
        # In whitelist mode, this should be allowed since whitelist has priority
        self.mock_controller.execute_command.reset_mock()
        # First add this command to the whitelist
        with open(self.whitelist_file.name, "a") as f:
            f.write("rm\n")  # This is also in blacklist
        # Recreate tool to load updated whitelist
        tool = TerminalTool(
            whitelist_file=self.whitelist_file.name,
            blacklist_file=self.blacklist_file.name,
            whitelist_mode=True,
        )
        response = await tool.execute_command(command="rm file.txt")
        self.assertTrue(response.success)
if __name__ == "__main__":
    import asyncio
    # Run the async tests
    def run_async_test(test_case):
        """Run an async test."""
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            result = loop.run_until_complete(test_case)
            return result
        finally:
            loop.close()
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_command_filter.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the command filtering functionality.
"""
import os
import sys
import tempfile
import unittest
# Add both src and project root to Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
src_path = os.path.join(project_root, "src")
sys.path.insert(0, project_root)
sys.path.insert(0, src_path)
# Import from package
from mcp_terminal.security.command_filter import CommandFilter
class TestCommandFilter(unittest.TestCase):
    """Test cases for the command filtering functionality."""
    def setUp(self):
        """Set up the test case."""
        # Create temporary files for whitelist and blacklist
        self.whitelist_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
        self.blacklist_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
        # Write test data to whitelist
        self.whitelist_file.write("# Whitelist test file\n")
        self.whitelist_file.write("ls\n")
        self.whitelist_file.write("cat\n")
        self.whitelist_file.write("pwd\n")
        self.whitelist_file.write("^git.*\n")  # Regex pattern for git commands
        self.whitelist_file.flush()
        # Write test data to blacklist
        self.blacklist_file.write("# Blacklist test file\n")
        self.blacklist_file.write("rm\n")
        self.blacklist_file.write("sudo\n")
        self.blacklist_file.write("^.*eval.*\n")  # Regex pattern containing eval
        self.blacklist_file.flush()
    def tearDown(self):
        """Clean up test resources."""
        # Close and remove temporary files
        self.whitelist_file.close()
        self.blacklist_file.close()
        os.unlink(self.whitelist_file.name)
        os.unlink(self.blacklist_file.name)
    def test_init_without_files(self):
        """Test initializing CommandFilter without files."""
        cmd_filter = CommandFilter()
        self.assertEqual(len(cmd_filter.whitelist), 0)
        self.assertEqual(len(cmd_filter.blacklist), 0)
    def test_init_with_files(self):
        """Test initializing CommandFilter with files."""
        cmd_filter = CommandFilter(
            whitelist_file=self.whitelist_file.name,
            blacklist_file=self.blacklist_file.name,
        )
        # Check whitelist loaded correctly (4 commands)
        self.assertEqual(len(cmd_filter.whitelist), 4)
        self.assertIn("ls", cmd_filter.whitelist)
        self.assertIn("cat", cmd_filter.whitelist)
        self.assertIn("pwd", cmd_filter.whitelist)
        self.assertIn("^git.*", cmd_filter.whitelist)
        # Check blacklist loaded correctly (3 commands)
        self.assertEqual(len(cmd_filter.blacklist), 3)
        self.assertIn("rm", cmd_filter.blacklist)
        self.assertIn("sudo", cmd_filter.blacklist)
        self.assertIn("^.*eval.*", cmd_filter.blacklist)
    def test_whitelist_mode(self):
        """Test whitelist mode functionality."""
        cmd_filter = CommandFilter(
            whitelist_file=self.whitelist_file.name, whitelist_mode=True
        )
        # Test allowed commands in whitelist mode
        allowed, _ = cmd_filter.is_command_allowed("ls -la")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("cat file.txt")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("git status")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("git clone repo")
        self.assertTrue(allowed)
        # Test blocked commands in whitelist mode
        allowed, reason = cmd_filter.is_command_allowed("rm file.txt")
        self.assertFalse(allowed)
        self.assertIn("not in whitelist", reason)
        allowed, reason = cmd_filter.is_command_allowed("docker ps")
        self.assertFalse(allowed)
        self.assertIn("not in whitelist", reason)
    def test_blacklist_mode(self):
        """Test blacklist mode functionality."""
        cmd_filter = CommandFilter(
            blacklist_file=self.blacklist_file.name, whitelist_mode=False
        )
        # Test allowed commands in blacklist mode
        allowed, _ = cmd_filter.is_command_allowed("ls -la")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("cat file.txt")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("echo Hello")
        self.assertTrue(allowed)
        # Test blocked commands in blacklist mode
        allowed, reason = cmd_filter.is_command_allowed("rm file.txt")
        self.assertFalse(allowed)
        self.assertIn("blacklisted", reason)
        allowed, reason = cmd_filter.is_command_allowed("sudo apt-get update")
        self.assertFalse(allowed)
        self.assertIn("blacklisted", reason)
        allowed, reason = cmd_filter.is_command_allowed(
            "python -c 'eval(\"print(1)\")'"
        )
        self.assertFalse(allowed)
        self.assertIn("blacklisted", reason)
    def test_empty_whitelist_mode(self):
        """Test whitelist mode with empty whitelist."""
        cmd_filter = CommandFilter(whitelist_mode=True)
        allowed, reason = cmd_filter.is_command_allowed("ls")
        self.assertFalse(allowed)
        self.assertIn("whitelist is empty", reason)
    def test_regex_patterns(self):
        """Test regex pattern matching."""
        # Create a filter with specific patterns
        cmd_filter = CommandFilter()
        cmd_filter.whitelist.add("^git (pull|push|status)$")
        cmd_filter.whitelist.add("^ls( -[la]+)?$")
        cmd_filter.whitelist_mode = True
        # Test pattern matching
        allowed, _ = cmd_filter.is_command_allowed("git status")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("git pull")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("ls")
        self.assertTrue(allowed)
        allowed, _ = cmd_filter.is_command_allowed("ls -la")
        self.assertTrue(allowed)
        # Test non-matching patterns
        allowed, _ = cmd_filter.is_command_allowed("git clone")
        self.assertFalse(allowed)
        allowed, _ = cmd_filter.is_command_allowed("ls -R")
        self.assertFalse(allowed)
    def test_invalid_regex_pattern(self):
        """Test handling invalid regex patterns."""
        cmd_filter = CommandFilter()
        cmd_filter.whitelist.add("^*invalid")  # Invalid regex
        cmd_filter.whitelist_mode = True
        # Should not raise exception, but should not match
        allowed, _ = cmd_filter.is_command_allowed("invalid regex")
        self.assertFalse(allowed)
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/src/mcp_terminal/tools/terminal.py:
--------------------------------------------------------------------------------
```python
"""
Terminal tool for MCP.
Provides terminal control operations through the MCP interface.
"""
import logging
import os
from typing import Optional
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from mcp_terminal.controllers import get_controller
from mcp_terminal.security.command_filter import CommandFilter
# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("MCP:Terminal:Tool")
# Define the models for the execute command function
class ExecuteCommandRequest(BaseModel):
    """Request model for executing a terminal command."""
    command: str = Field(..., description="The command to execute in the terminal")
    wait_for_output: bool = Field(
        True, description="Whether to wait for the command output"
    )
    timeout: int = Field(
        10, description="Timeout in seconds for waiting for the command output"
    )
class ExecuteCommandResponse(BaseModel):
    """Response model for the executed terminal command."""
    success: bool = Field(
        ..., description="Whether the command execution was successful"
    )
    output: Optional[str] = Field(None, description="The command output if available")
    error: Optional[str] = Field(
        None, description="Error message if the command failed"
    )
    return_code: Optional[int] = Field(
        None, description="The command return code if available"
    )
    warning: Optional[str] = Field(None, description="Warning message if any")
class TerminalInfoResponse(BaseModel):
    """Response model for terminal information."""
    terminal_type: str = Field(..., description="The type of terminal being used")
    platform: str = Field(..., description="The platform the terminal is running on")
    current_directory: str = Field(
        ..., description="Current working directory of the terminal"
    )
    user: str = Field(..., description="Current user name")
    shell: Optional[str] = Field(None, description="Shell being used")
    terminal_size: Optional[dict] = Field(
        None, description="Terminal dimensions (rows, columns)"
    )
class TerminalTool:
    """
    MCP tool for terminal operations.
    This tool provides functions to execute commands in the terminal
    and get information about the terminal.
    """
    def __init__(
        self,
        controller_type: Optional[str] = None,
        whitelist_file: Optional[str] = None,
        blacklist_file: Optional[str] = None,
        whitelist_mode: bool = False,
    ):
        """
        Initialize the terminal tool.
        Args:
            controller_type: The type of controller to use ("iterm", "applescript", "subprocess")
                           or None to auto-detect
            whitelist_file: Path to whitelist file
            blacklist_file: Path to blacklist file
            whitelist_mode: If True, only whitelisted commands are allowed
        """
        self.name = "terminal"
        self.controller_type = controller_type
        self.controller = None
        self._init_controller()
        # Initialize command filter
        self.command_filter = CommandFilter(
            whitelist_file=whitelist_file,
            blacklist_file=blacklist_file,
            whitelist_mode=whitelist_mode,
        )
    def _init_controller(self):
        """Initialize the terminal controller."""
        try:
            self.controller = get_controller(self.controller_type)
            logger.info(
                f"Initialized terminal controller: {type(self.controller).__name__}"
            )
        except Exception as e:
            logger.error(f"Failed to initialize terminal controller: {e}")
            raise
    def register_mcp(self, mcp: FastMCP) -> None:
        """Register the terminal tool with the MCP server."""
        @mcp.tool(name="execute_command", description="Executes a terminal command")
        async def execute_command(
            command: str,
            wait_for_output: bool = True,
            timeout: int = 10,
        ) -> ExecuteCommandResponse:
            try:
                # Check if command is allowed
                is_allowed, reason = self.command_filter.is_command_allowed(command)
                if not is_allowed:
                    logger.warning(
                        f"Command execution denied: {command}. Reason: {reason}"
                    )
                    return ExecuteCommandResponse(
                        success=False,
                        error=f"Command not allowed: {reason}",
                    )
                # Ensure we have a controller
                if not self.controller:
                    self._init_controller()
                # Execute the command
                result = await self.controller.execute_command(
                    command, wait_for_output, timeout
                )
                # Convert to response model
                return ExecuteCommandResponse(
                    success=result.get("success", False),
                    output=result.get("output"),
                    error=result.get("error"),
                    return_code=result.get("return_code"),
                    warning=result.get("warning"),
                )
            except Exception as e:
                logger.error(f"Error executing command: {e}")
                return ExecuteCommandResponse(
                    success=False, error=f"Error executing command: {str(e)}"
                )
        @mcp.tool(name="get_terminal_info", description="Gets terminal information")
        async def get_terminal_info() -> TerminalInfoResponse:
            try:
                # Ensure we have a controller
                if not self.controller:
                    self._init_controller()
                # Get terminal type
                terminal_type = await self.controller.get_terminal_type()
                # Get platform
                import getpass
                import os
                import platform
                import shutil
                platform_name = platform.system()
                # Get current directory directly
                current_dir = None
                try:
                    pwd_result = await self.controller.execute_command(
                        "pwd", wait_for_output=True, timeout=5
                    )
                    if pwd_result.get("success") and pwd_result.get("output"):
                        # Clean the output by splitting lines and finding a valid path
                        lines = pwd_result.get("output").splitlines()
                        for line in lines:
                            line = line.strip()
                            # On macOS/Linux, a valid path should start with /
                            if line.startswith("/"):
                                current_dir = line
                                break
                except Exception as e:
                    logger.debug(f"Error getting current directory from terminal: {e}")
                # Fallback to Python's os.getcwd()
                if not current_dir:
                    current_dir = os.getcwd()
                # Get user
                user = getpass.getuser()
                # Get shell
                shell = os.environ.get("SHELL", None)
                # Get terminal size if possible
                terminal_size = None
                try:
                    cols, rows = shutil.get_terminal_size(fallback=(80, 24))
                    terminal_size = {"rows": rows, "columns": cols}
                except Exception:
                    pass
                return TerminalInfoResponse(
                    terminal_type=terminal_type,
                    platform=platform_name,
                    current_directory=current_dir,
                    user=user,
                    shell=shell,
                    terminal_size=terminal_size,
                )
            except Exception as e:
                logger.error(f"Error getting terminal info: {e}")
                return TerminalInfoResponse(
                    terminal_type="unknown",
                    platform="unknown",
                    current_directory="unknown",
                    user="unknown",
                )
```
--------------------------------------------------------------------------------
/src/mcp_terminal/controllers/iterm.py:
--------------------------------------------------------------------------------
```python
"""
iTerm2 controller using the official iTerm2 Python API.
This provides more advanced control over iTerm2.
"""
import asyncio
import platform
import time
from typing import Any, Dict, List, Optional
try:
    import iterm2
    ITERM2_AVAILABLE = True
except ImportError:
    ITERM2_AVAILABLE = False
from mcp_terminal.controllers.base import BaseTerminalController
class ITermController(BaseTerminalController):
    """Terminal controller using the iTerm2 Python API."""
    def __init__(self):
        """Initialize the iTerm2 controller."""
        if platform.system() != "Darwin":
            raise RuntimeError("iTerm2 controller only works on macOS")
        if not ITERM2_AVAILABLE:
            raise ImportError(
                "iTerm2 Python API not available. Install with 'pip install iterm2'"
            )
        self.connection = None
        self.app = None
        self.current_session = None
    async def _ensure_connection(self) -> bool:
        """
        Ensure connection to iTerm2.
        Returns:
            True if connected, False otherwise
        """
        if self.connection is not None:
            return True
        try:
            self.connection = await iterm2.Connection.async_create()
            self.app = await iterm2.async_get_app(self.connection)
            return True
        except Exception as e:
            print(f"Failed to connect to iTerm2: {e}")
            return False
    async def _ensure_session(self) -> Optional[iterm2.Session]:
        """
        Ensure there's an active window and session.
        Returns:
            The active session or None if failed
        """
        # First, use AppleScript to ensure iTerm2 is running with a window and tab
        try:
            import subprocess
            import time
            print(
                "Ensuring iTerm2 is running with a window and tab using AppleScript..."
            )
            # AppleScript to launch iTerm2, create a window if none exists, and create a tab if none exists
            applescript = """
            tell application "iTerm2"
                activate
                if (count of windows) is 0 then
                    create window with default profile
                end if
                tell current window
                    if (count of tabs) is 0 then
                        create tab with default profile
                    end if
                    tell current session
                        -- Just to ensure the session is ready
                        write text ""
                    end tell
                end tell
            end tell
            """
            # Run the AppleScript
            result = subprocess.run(
                ["osascript", "-e", applescript], capture_output=True, text=True
            )
            if result.returncode != 0:
                print(f"Error running AppleScript: {result.stderr}")
            else:
                print("Successfully launched iTerm2 with AppleScript")
            # Wait for iTerm2 to fully initialize
            time.sleep(3)
        except Exception as e:
            print(f"Error launching iTerm2 with AppleScript: {e}")
        # Try to ensure connection multiple times
        max_attempts = 5  # Increased number of attempts
        for attempt in range(max_attempts):
            if await self._ensure_connection():
                print(f"Successfully connected to iTerm2 on attempt {attempt+1}")
                break
            print(f"Connection attempt {attempt+1}/{max_attempts} failed. Retrying...")
            await asyncio.sleep(2)  # Longer delay between attempts
        else:
            print("Failed to connect to iTerm2 after multiple attempts")
            return None
        try:
            # Get all windows
            windows = await self.app.async_get_windows()
            if not windows:
                print("No windows found even after AppleScript initialization")
                return None
            window = windows[0]
            print(f"Found {len(windows)} window(s)")
            # Get all tabs
            tabs = await window.async_get_tabs()
            if not tabs:
                print("No tabs found even after AppleScript initialization")
                return None
            tab = tabs[0]
            print(f"Found {len(tabs)} tab(s)")
            # Get current session
            self.current_session = await tab.async_get_active_session()
            if self.current_session is None:
                print("No active session found even after AppleScript initialization")
                return None
            print("Successfully obtained iTerm2 session")
            return self.current_session
        except Exception as e:
            print(f"Error ensuring iTerm2 session: {e}")
            return None
    async def execute_command(
        self, command: str, wait_for_output: bool = True, timeout: int = 10
    ) -> Dict[str, Any]:
        """
        Execute a command in iTerm2.
        Args:
            command: The command to execute
            wait_for_output: Whether to wait for output
            timeout: Timeout in seconds
        Returns:
            A dictionary with the result of the command execution
        """
        try:
            session = await self._ensure_session()
            if session is None:
                return {
                    "success": False,
                    "error": "Failed to get iTerm2 session",
                }
            # Send the command
            await session.async_send_text(command + "\n")
            if wait_for_output:
                # Wait for a moment to let the command execute
                await asyncio.sleep(0.5)
                # Initialize variables for capturing output
                start_time = time.time()
                initial_lines = await session.async_get_screen_contents()
                last_line_count = len(initial_lines.contents)
                last_update = time.time()
                # Wait for output to stop changing
                while True:
                    # Check if we've exceeded the timeout
                    current_time = time.time()
                    if current_time - start_time > timeout:
                        break
                    # Get current screen contents
                    screen = await session.async_get_screen_contents()
                    current_line_count = len(screen.contents)
                    # If output has changed, update the last_update time
                    if current_line_count != last_line_count:
                        last_line_count = current_line_count
                        last_update = current_time
                    # If output hasn't changed for a second, we're probably done
                    if current_time - last_update > 1.0:
                        break
                    # Sleep to avoid consuming too much CPU
                    await asyncio.sleep(0.2)
                # Get the final output
                screen = await session.async_get_screen_contents()
                lines = [line.string for line in screen.contents]
                # Try to extract just the command output by removing the command itself
                try:
                    # Find the command in the output
                    cmd_line_index = -1
                    for i, line in enumerate(lines):
                        if command in line:
                            cmd_line_index = i
                            break
                    # If we found the command, return everything after it
                    if cmd_line_index >= 0 and cmd_line_index < len(lines) - 1:
                        output = "\n".join(lines[cmd_line_index + 1 :])
                    else:
                        output = "\n".join(lines)
                    return {"success": True, "output": output}
                except Exception as e:
                    # If parsing fails, return all output
                    return {
                        "success": True,
                        "output": "\n".join(lines),
                        "parse_error": str(e),
                    }
            return {"success": True, "output": "Command sent (output not captured)"}
        except Exception as e:
            return {"success": False, "error": f"Error executing command: {str(e)}"}
    async def get_terminal_type(self) -> str:
        """
        Get the terminal type.
        Returns:
            The terminal type
        """
        return "iTerm2"
    async def cleanup(self) -> None:
        """
        Clean up resources.
        """
        try:
            if self.connection:
                # Try to close the connection gracefully
                try:
                    await asyncio.wait_for(self.connection.async_close(), timeout=2.0)
                except asyncio.TimeoutError:
                    # If it times out, log a warning but continue cleanup
                    print("Warning: iTerm2 connection close timed out")
                except Exception as e:
                    # If any other error occurs, log it but continue cleanup
                    print(f"Error closing iTerm2 connection: {e}")
                # Ensure references are cleared even if close fails
                self.connection = None
                self.app = None
                self.current_session = None
        except Exception as e:
            print(f"Error during iTerm2 controller cleanup: {e}")
if __name__ == "__main__":
    controller = ITermController()
    print(controller.execute_command("ls -l"))
```
--------------------------------------------------------------------------------
/src/mcp_terminal/server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
MCP Terminal Server
This module implements a terminal control server using the Model Context Protocol (MCP).
It supports both standard input/output (stdio) and Server-Sent Events (SSE) transports.
"""
import argparse
import asyncio
import logging
import platform
import signal
import sys
from enum import Enum
from typing import Optional
from mcp.server.fastmcp import FastMCP
from mcp_terminal.tools.file import FileTool
from mcp_terminal.tools.terminal import TerminalTool
# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("MCP:Terminal:Server")
class ServerMode(str, Enum):
    """Server transport modes."""
    STDIO = "stdio"
    SSE = "sse"
class MCPTerminalServer:
    """
    MCP Terminal Server that registers and exposes terminal tools.
    """
    def __init__(
        self,
        controller_type: Optional[str] = None,
        mode: ServerMode = ServerMode.STDIO,
        host: str = "127.0.0.1",
        port: int = 3000,
        log_level: str = "INFO",
        whitelist_file: Optional[str] = None,
        blacklist_file: Optional[str] = None,
        whitelist_mode: bool = False,
    ):
        """
        Initialize the MCP Terminal Server.
        Args:
            controller_type: Type of terminal controller to use ("iterm", "applescript", "subprocess")
            mode: Server transport mode (stdio or sse)
            host: Host to bind the server to (for SSE mode)
            port: Port to bind the server to (for SSE mode)
            log_level: Logging level
            whitelist_file: Path to command whitelist file
            blacklist_file: Path to command blacklist file
            whitelist_mode: If True, only whitelisted commands are allowed
        """
        self.controller_type = controller_type
        self.mode = mode
        self.host = host
        self.port = port
        self.whitelist_file = whitelist_file
        self.blacklist_file = blacklist_file
        self.whitelist_mode = whitelist_mode
        # Set up logging
        logging.getLogger().setLevel(getattr(logging, log_level))
        # Create MCP server
        self.mcp = FastMCP("mcp-terminal", host=host, port=port, log_level=log_level)
        self.tools = {}
        self.tools_registered = False
    def register_tools(self) -> None:
        """
        Register all tools with the MCP server.
        """
        if self.tools_registered:
            return
        logger.info("Registering terminal tool with MCP server")
        try:
            # Create and register the terminal tool
            terminal_tool = TerminalTool(
                self.controller_type,
                whitelist_file=self.whitelist_file,
                blacklist_file=self.blacklist_file,
                whitelist_mode=self.whitelist_mode,
            )
            file_tool = FileTool()
            terminal_tool.register_mcp(self.mcp)
            file_tool.register_mcp(self.mcp)
            self.tools["terminal"] = terminal_tool
            self.tools["file"] = file_tool
            self.tools_registered = True
            logger.info("Terminal tool registered with MCP server")
        except Exception as e:
            logger.error(f"Failed to register terminal tool: {str(e)}", exc_info=True)
            raise
    async def start(self) -> None:
        """
        Start the MCP Terminal Server.
        """
        # Register all tools
        self.register_tools()
        # Start the server using the appropriate transport
        if self.mode == ServerMode.SSE:
            logger.info(
                f"Starting MCP Terminal Server in SSE mode on {self.host}:{self.port}"
            )
            await self.mcp.run_sse_async()
        else:  # STDIO mode
            logger.info("Starting MCP Terminal Server in stdio mode")
            await self.mcp.run_stdio_async()
    async def cleanup(self) -> None:
        """
        Clean up resources before shutting down.
        """
        logger.info("Starting cleanup process")
        # First ensure MCP resources are properly cleaned up
        if hasattr(self.mcp, "_mcp_server") and hasattr(
            self.mcp._mcp_server, "_task_group"
        ):
            logger.info("Ensuring MCP task groups are properly closed")
            try:
                # Give in-flight requests a chance to complete
                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error during MCP cleanup delay: {e}")
        # Then clean up tool controllers
        for tool_name, tool in self.tools.items():
            if hasattr(tool, "controller") and hasattr(tool.controller, "cleanup"):
                logger.info(f"Cleaning up {tool_name} controller")
                try:
                    await tool.controller.cleanup()
                except Exception as e:
                    logger.warning(f"Error cleaning up {tool_name} controller: {e}")
        logger.info("Cleanup process completed")
def main() -> None:
    """
    Main entry point for the MCP Terminal Server.
    """
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="MCP Terminal Server")
    # Terminal controller options
    controller_group = parser.add_argument_group("Terminal Controller Options")
    controller_group.add_argument(
        "--controller",
        "-c",
        choices=["auto", "iterm", "applescript", "subprocess"],
        default="auto",
        help="Terminal controller to use (default: auto-detect)",
    )
    # Server mode options
    server_group = parser.add_argument_group("Server Options")
    server_group.add_argument(
        "--mode",
        "-m",
        choices=[mode.value for mode in ServerMode],
        default=ServerMode.STDIO.value,
        help=f"Server mode (default: {ServerMode.STDIO.value})",
    )
    server_group.add_argument(
        "--host",
        default="127.0.0.1",
        help="Host to bind the server to in SSE mode (default: 127.0.0.1)",
    )
    server_group.add_argument(
        "--port",
        "-p",
        type=int,
        default=3000,
        help="Port to bind the server to in SSE mode (default: 3000)",
    )
    # Command filtering options
    security_group = parser.add_argument_group("Security Options")
    security_group.add_argument(
        "--whitelist-file",
        type=str,
        help="Path to whitelist file with allowed commands (one per line)",
    )
    security_group.add_argument(
        "--blacklist-file",
        type=str,
        help="Path to blacklist file with blocked commands (one per line)",
    )
    security_group.add_argument(
        "--whitelist-mode",
        action="store_true",
        help="Enable whitelist mode (only allow commands in whitelist)",
    )
    # Logging options
    logging_group = parser.add_argument_group("Logging Options")
    logging_group.add_argument(
        "--log-level",
        "-l",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        default="INFO",
        help="Logging level (default: INFO)",
    )
    args = parser.parse_args()
    # Determine controller type
    controller_type = None if args.controller == "auto" else args.controller
    # Adjust controller based on platform
    if controller_type in ["iterm", "applescript"] and platform.system() != "Darwin":
        logger.warning(
            f"{controller_type} controller not available on {platform.system()}. Falling back to subprocess."
        )
        controller_type = "subprocess"
    # Create the server
    server = MCPTerminalServer(
        controller_type=controller_type,
        mode=args.mode,
        host=args.host,
        port=args.port,
        log_level=args.log_level,
        whitelist_file=args.whitelist_file,
        blacklist_file=args.blacklist_file,
        whitelist_mode=args.whitelist_mode,
    )
    # Run the server
    loop = asyncio.get_event_loop()
    shutdown_in_progress = False
    # Define a shutdown handler that ensures we only run shutdown once
    async def handle_shutdown():
        nonlocal shutdown_in_progress
        if shutdown_in_progress:
            logger.info("Shutdown already in progress, ignoring additional signal")
            return
        shutdown_in_progress = True
        await shutdown(loop, server)
    try:
        # Set up signal handlers for graceful shutdown
        if (
            sys.platform != "win32"
        ):  # Windows doesn't support SIGTERM/SIGINT handling in the same way
            for signal_name in ["SIGTERM", "SIGINT"]:
                loop.add_signal_handler(
                    getattr(signal, signal_name),
                    lambda: asyncio.create_task(handle_shutdown()),
                )
        # Run the server
        loop.run_until_complete(server.start())
    except KeyboardInterrupt:
        logger.info("Received keyboard interrupt")
        # Handle KeyboardInterrupt manually by running the shutdown coroutine
        if not shutdown_in_progress:
            try:
                # Make sure we properly shut down even if interrupted
                loop.run_until_complete(handle_shutdown())
            except Exception as e:
                logger.error(f"Error during shutdown after keyboard interrupt: {e}")
    except Exception as e:
        logger.error(f"Error during server execution: {e}", exc_info=True)
    finally:
        logger.info("Server shutting down")
        # Close the event loop
        try:
            # Gather any remaining tasks
            pending = asyncio.all_tasks(loop)
            if pending:
                # Cancel all remaining tasks
                for task in pending:
                    task.cancel()
                # Use a separate try/except block to avoid suppressing the original exception
                try:
                    # Wait for a short time for tasks to cancel
                    loop.run_until_complete(asyncio.wait(pending, timeout=0.5))
                except (asyncio.CancelledError, RuntimeError):
                    # Ignore cancelled errors and "Event loop stopped before Future completed"
                    pass
        except Exception as e:
            logger.error(f"Error during final cleanup: {e}")
        finally:
            # Close the event loop
            try:
                loop.close()
            except Exception as e:
                logger.error(f"Error closing event loop: {e}")
        # Exit with a success code
        sys.exit(0)
async def shutdown(loop, server):
    """Handle graceful shutdown."""
    logger.info("Shutdown signal received")
    # Give pending tasks a chance to complete
    await asyncio.sleep(0.2)
    # Perform cleanup
    await server.cleanup()
    # Cancel all running tasks
    tasks = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()]
    if tasks:
        logger.info(f"Cancelling {len(tasks)} pending tasks")
        for task in tasks:
            task.cancel()
        # Wait for tasks to cancel with a timeout
        try:
            await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=3.0,  # Increased timeout
            )
        except asyncio.TimeoutError:
            logger.warning("Some tasks did not cancel within the timeout period")
        except Exception as e:
            logger.warning(f"Error during task cancellation: {e}")
    # Stop the loop
    loop.stop()
if __name__ == "__main__":
    main()
```