# Directory Structure ``` ├── .editorconfig ├── .github │ ├── actions │ │ └── setup-python-env │ │ └── action.yml │ └── workflows │ ├── main.yml │ ├── on-release-main.yml │ └── validate-codecov-config.yml ├── .gitignore ├── .pre-commit-config.yaml ├── .python-version ├── .vscode │ └── settings.json ├── codecov.yaml ├── CONTRIBUTING.md ├── dev │ ├── claude_desktop_config.json │ └── install_claude_desktop.py ├── Dockerfile ├── docs │ ├── index.md │ └── modules.md ├── LICENSE ├── Makefile ├── mcp_email_server │ ├── __init__.py │ ├── app.py │ ├── cli.py │ ├── config.py │ ├── emails │ │ ├── __init__.py │ │ ├── classic.py │ │ ├── dispatcher.py │ │ ├── models.py │ │ └── provider │ │ └── __init__.py │ ├── log.py │ ├── tools │ │ ├── __init__.py │ │ ├── claude_desktop_config.json │ │ └── installer.py │ └── ui.py ├── mkdocs.yml ├── pyproject.toml ├── pytest.ini ├── README.md ├── smithery.yaml ├── tests │ ├── conftest.py │ ├── test_classic_handler.py │ ├── test_config.py │ ├── test_dispatcher.py │ ├── test_email_client.py │ ├── test_env_config_coverage.py │ ├── test_mcp_tools.py │ └── test_models.py ├── tox.ini └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.12 ``` -------------------------------------------------------------------------------- /.editorconfig: -------------------------------------------------------------------------------- ``` max_line_length = 120 [*.json] indent_style = space indent_size = 4 ``` -------------------------------------------------------------------------------- /.pre-commit-config.yaml: -------------------------------------------------------------------------------- ```yaml repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: "v6.0.0" hooks: - id: check-case-conflict - id: check-merge-conflict - id: check-toml - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit rev: "v0.14.0" hooks: - id: ruff args: [--exit-non-zero-on-fix] - id: ruff-format - repo: https://github.com/pre-commit/mirrors-prettier rev: "v4.0.0-alpha.8" hooks: - id: prettier ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` docs/source # From https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # Vscode config files # .vscode/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ tests/config.toml local/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # mcp-email-server [](https://img.shields.io/github/v/release/ai-zerolab/mcp-email-server) [](https://github.com/ai-zerolab/mcp-email-server/actions/workflows/main.yml?query=branch%3Amain) [](https://codecov.io/gh/ai-zerolab/mcp-email-server) [](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-email-server) [](https://img.shields.io/github/license/ai-zerolab/mcp-email-server) [](https://smithery.ai/server/@ai-zerolab/mcp-email-server) IMAP and SMTP via MCP Server - **Github repository**: <https://github.com/ai-zerolab/mcp-email-server/> - **Documentation** <https://ai-zerolab.github.io/mcp-email-server/> ## Installation ### Manual Installation We recommend using [uv](https://github.com/astral-sh/uv) to manage your environment. Try `uvx mcp-email-server@latest ui` to config, and use following configuration for mcp client: ```json { "mcpServers": { "zerolib-email": { "command": "uvx", "args": ["mcp-email-server@latest", "stdio"] } } } ``` This package is available on PyPI, so you can install it using `pip install mcp-email-server` After that, configure your email server using the ui: `mcp-email-server ui` ### Environment Variable Configuration You can also configure the email server using environment variables, which is particularly useful for CI/CD environments like Jenkins. zerolib-email supports both UI configuration (via TOML file) and environment variables, with environment variables taking precedence. ```json { "mcpServers": { "zerolib-email": { "command": "uvx", "args": ["mcp-email-server@latest", "stdio"], "env": { "MCP_EMAIL_SERVER_ACCOUNT_NAME": "work", "MCP_EMAIL_SERVER_FULL_NAME": "John Doe", "MCP_EMAIL_SERVER_EMAIL_ADDRESS": "[email protected]", "MCP_EMAIL_SERVER_USER_NAME": "[email protected]", "MCP_EMAIL_SERVER_PASSWORD": "your_password", "MCP_EMAIL_SERVER_IMAP_HOST": "imap.gmail.com", "MCP_EMAIL_SERVER_IMAP_PORT": "993", "MCP_EMAIL_SERVER_SMTP_HOST": "smtp.gmail.com", "MCP_EMAIL_SERVER_SMTP_PORT": "465" } } } } ``` #### Available Environment Variables | Variable | Description | Default | Required | | --------------------------------- | ------------------ | ------------- | -------- | | `MCP_EMAIL_SERVER_ACCOUNT_NAME` | Account identifier | `"default"` | No | | `MCP_EMAIL_SERVER_FULL_NAME` | Display name | Email prefix | No | | `MCP_EMAIL_SERVER_EMAIL_ADDRESS` | Email address | - | Yes | | `MCP_EMAIL_SERVER_USER_NAME` | Login username | Same as email | No | | `MCP_EMAIL_SERVER_PASSWORD` | Email password | - | Yes | | `MCP_EMAIL_SERVER_IMAP_HOST` | IMAP server host | - | Yes | | `MCP_EMAIL_SERVER_IMAP_PORT` | IMAP server port | `993` | No | | `MCP_EMAIL_SERVER_IMAP_SSL` | Enable IMAP SSL | `true` | No | | `MCP_EMAIL_SERVER_SMTP_HOST` | SMTP server host | - | Yes | | `MCP_EMAIL_SERVER_SMTP_PORT` | SMTP server port | `465` | No | | `MCP_EMAIL_SERVER_SMTP_SSL` | Enable SMTP SSL | `true` | No | | `MCP_EMAIL_SERVER_SMTP_START_SSL` | Enable STARTTLS | `false` | No | For separate IMAP/SMTP credentials, you can also use: - `MCP_EMAIL_SERVER_IMAP_USER_NAME` / `MCP_EMAIL_SERVER_IMAP_PASSWORD` - `MCP_EMAIL_SERVER_SMTP_USER_NAME` / `MCP_EMAIL_SERVER_SMTP_PASSWORD` Then you can try it in [Claude Desktop](https://claude.ai/download). If you want to intergrate it with other mcp client, run `$which mcp-email-server` for the path and configure it in your client like: ```json { "mcpServers": { "zerolib-email": { "command": "{{ ENTRYPOINT }}", "args": ["stdio"] } } } ``` If `docker` is avaliable, you can try use docker image, but you may need to config it in your client using `tools` via `MCP`. The default config path is `~/.config/zerolib/mcp_email_server/config.toml` ```json { "mcpServers": { "zerolib-email": { "command": "docker", "args": ["run", "-it", "ghcr.io/ai-zerolab/mcp-email-server:latest"] } } } ``` ### Installing via Smithery To install Email Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ai-zerolab/mcp-email-server): ```bash npx -y @smithery/cli install @ai-zerolab/mcp-email-server --client claude ``` ## Development This project is managed using [uv](https://github.com/ai-zerolab/uv). Try `make install` to install the virtual environment and install the pre-commit hooks. Use `uv run mcp-email-server` for local development. ## Releasing a new version - Create an API Token on [PyPI](https://pypi.org/). - Add the API Token to your projects secrets with the name `PYPI_TOKEN` by visiting [this page](https://github.com/ai-zerolab/mcp-email-server/settings/secrets/actions/new). - Create a [new release](https://github.com/ai-zerolab/mcp-email-server/releases/new) on Github. - Create a new tag in the form `*.*.*`. For more details, see [here](https://fpgmaas.github.io/cookiecutter-uv/features/cicd/#how-to-trigger-a-release). ``` -------------------------------------------------------------------------------- /CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown # Contributing to `mcp-email-server` Contributions are welcome, and they are greatly appreciated! Every little bit helps, and credit will always be given. You can contribute in many ways: # Types of Contributions ## Report Bugs Report bugs at https://github.com/ai-zerolab/mcp-email-server/issues If you are reporting a bug, please include: - Your operating system name and version. - Any details about your local setup that might be helpful in troubleshooting. - Detailed steps to reproduce the bug. ## Fix Bugs Look through the GitHub issues for bugs. Anything tagged with "bug" and "help wanted" is open to whoever wants to implement a fix for it. ## Implement Features Look through the GitHub issues for features. Anything tagged with "enhancement" and "help wanted" is open to whoever wants to implement it. ## Write Documentation mcp-email-server could always use more documentation, whether as part of the official docs, in docstrings, or even on the web in blog posts, articles, and such. ## Submit Feedback The best way to send feedback is to file an issue at https://github.com/ai-zerolab/mcp-email-server/issues. If you are proposing a new feature: - Explain in detail how it would work. - Keep the scope as narrow as possible, to make it easier to implement. - Remember that this is a volunteer-driven project, and that contributions are welcome :) # Get Started! Ready to contribute? Here's how to set up `mcp-email-server` for local development. Please note this documentation assumes you already have `uv` and `Git` installed and ready to go. 1. Fork the `mcp-email-server` repo on GitHub. 2. Clone your fork locally: ```bash cd <directory_in_which_repo_should_be_created> git clone [email protected]:YOUR_NAME/mcp-email-server.git ``` 3. Now we need to install the environment. Navigate into the directory ```bash cd mcp-email-server ``` Then, install and activate the environment with: ```bash uv sync ``` 4. Install pre-commit to run linters/formatters at commit time: ```bash uv run pre-commit install ``` 5. Create a branch for local development: ```bash git checkout -b name-of-your-bugfix-or-feature ``` Now you can make your changes locally. 6. Don't forget to add test cases for your added functionality to the `tests` directory. 7. When you're done making changes, check that your changes pass the formatting tests. ```bash make check ``` Now, validate that all unit tests are passing: ```bash make test ``` 9. Before raising a pull request you should also run tox. This will run the tests across different versions of Python: ```bash tox ``` This requires you to have multiple versions of python installed. This step is also triggered in the CI/CD pipeline, so you could also choose to skip this step locally. 10. Commit your changes and push your branch to GitHub: ```bash git add . git commit -m "Your detailed description of your changes." git push origin name-of-your-bugfix-or-feature ``` 11. Submit a pull request through the GitHub website. # Pull Request Guidelines Before you submit a pull request, check that it meets these guidelines: 1. The pull request should include tests. 2. If the pull request adds functionality, the docs should be updated. Put your new functionality into a function with a docstring, and add the feature to the list in `README.md`. ``` -------------------------------------------------------------------------------- /mcp_email_server/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_email_server/emails/provider/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_email_server/tools/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /docs/modules.md: -------------------------------------------------------------------------------- ```markdown ::: mcp_email_server.app ``` -------------------------------------------------------------------------------- /.vscode/settings.json: -------------------------------------------------------------------------------- ```json { "python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python" } ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` # pytest.ini [pytest] asyncio_mode = auto # asyncio_default_fixture_loop_scope = ``` -------------------------------------------------------------------------------- /mcp_email_server/tools/claude_desktop_config.json: -------------------------------------------------------------------------------- ```json { "mcpServers": { "zerolib-email": { "command": "{{ ENTRYPOINT }}", "args": ["stdio"] } } } ``` -------------------------------------------------------------------------------- /dev/claude_desktop_config.json: -------------------------------------------------------------------------------- ```json { "mcpServers": { "email-dev": { "command": "{{ PWD }}/.venv/bin/python {{ PWD }}/mcp_email_server/cli.py", "args": ["stdio"] } } } ``` -------------------------------------------------------------------------------- /codecov.yaml: -------------------------------------------------------------------------------- ```yaml coverage: range: 70..100 round: down precision: 1 status: project: default: target: 90% threshold: 0.5% codecov: token: f927bff4-d404-4986-8c11-624eadda8431 ``` -------------------------------------------------------------------------------- /mcp_email_server/log.py: -------------------------------------------------------------------------------- ```python import os USER_DEFINED_LOG_LEVEL = os.getenv("MCP_EMAIL_SERVER_LOG_LEVEL", "INFO") os.environ["LOGURU_LEVEL"] = USER_DEFINED_LOG_LEVEL from loguru import logger # noqa: E402 __all__ = ["logger"] ``` -------------------------------------------------------------------------------- /.github/workflows/validate-codecov-config.yml: -------------------------------------------------------------------------------- ```yaml name: validate-codecov-config on: pull_request: paths: [codecov.yaml] push: branches: [main] jobs: validate-codecov-config: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 - name: Validate codecov configuration run: curl -sSL --fail-with-body --data-binary @codecov.yaml https://codecov.io/validate ``` -------------------------------------------------------------------------------- /tox.ini: -------------------------------------------------------------------------------- ``` [tox] skipsdist = true envlist = py310, py311, py312, py313, py314 [gh-actions] python = 3.10: py310 3.11: py311 3.12: py312 3.13: py313 ; 3.14: py314 [testenv] passenv = PYTHON_VERSION allowlist_externals = uv commands = uv sync --python {envpython} uv run python -m pytest --doctest-modules tests --cov --cov-config=pyproject.toml --cov-report=xml ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object properties: {} commandFunction: # A JS function that produces the CLI command based on the given config to start the MCP on stdio. |- (config) => ({ command: 'uv', args: ['run', 'mcp-email-server@latest', 'stdio'], env: {} }) exampleConfig: {} ``` -------------------------------------------------------------------------------- /mcp_email_server/cli.py: -------------------------------------------------------------------------------- ```python import typer from mcp_email_server.app import mcp from mcp_email_server.config import delete_settings app = typer.Typer() @app.command() def stdio(): mcp.run(transport="stdio") @app.command() def sse( host: str = "localhost", port: int = 9557, ): mcp.settings.host = host mcp.settings.port = port mcp.run(transport="sse") @app.command() def ui(): from mcp_email_server.ui import main as ui_main ui_main() @app.command() def reset(): delete_settings() typer.echo("✅ Config reset") if __name__ == "__main__": app(["stdio"]) ``` -------------------------------------------------------------------------------- /mcp_email_server/emails/dispatcher.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations from typing import TYPE_CHECKING from mcp_email_server.config import EmailSettings, ProviderSettings, get_settings from mcp_email_server.emails.classic import ClassicEmailHandler if TYPE_CHECKING: from mcp_email_server.emails import EmailHandler def dispatch_handler(account_name: str) -> EmailHandler: settings = get_settings() account = settings.get_account(account_name) if isinstance(account, ProviderSettings): raise NotImplementedError if isinstance(account, EmailSettings): return ClassicEmailHandler(account) raise ValueError(f"Account {account_name} not found, available accounts: {settings.get_accounts()}") ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile # Install uv FROM python:3.12-slim # Install tini RUN apt-get update && \ apt-get install -y --no-install-recommends tini && \ rm -rf /var/lib/apt/lists/* COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv # Change the working directory to the `app` directory WORKDIR /app # Copy the lockfile and `pyproject.toml` into the image COPY uv.lock /app/uv.lock COPY pyproject.toml /app/pyproject.toml # Install dependencies RUN uv sync --frozen --no-install-project # Copy the project into the image COPY . /app # Sync the project RUN uv sync --frozen # Run the server ENTRYPOINT ["tini", "--", "uv", "run", "mcp-email-server"] CMD ["stdio"] ``` -------------------------------------------------------------------------------- /docs/index.md: -------------------------------------------------------------------------------- ```markdown # mcp-email-server [](https://img.shields.io/github/v/release/ai-zerolab/mcp-email-server) [](https://github.com/ai-zerolab/mcp-email-server/actions/workflows/main.yml?query=branch%3Amain) [](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-email-server) [](https://img.shields.io/github/license/ai-zerolab/mcp-email-server) IMAP and SMTP via MCP Server ``` -------------------------------------------------------------------------------- /.github/actions/setup-python-env/action.yml: -------------------------------------------------------------------------------- ```yaml name: "Setup Python Environment" description: "Set up Python environment for the given Python version" inputs: python-version: description: "Python version to use" required: true default: "3.12" uv-version: description: "uv version to use" required: true default: "0.6.2" runs: using: "composite" steps: - uses: actions/setup-python@v5 with: python-version: ${{ inputs.python-version }} - name: Install uv uses: astral-sh/setup-uv@v2 with: version: ${{ inputs.uv-version }} enable-cache: "true" cache-suffix: ${{ matrix.python-version }} - name: Install Python dependencies run: uv sync --frozen --python=${{ matrix.python-version }} shell: bash ``` -------------------------------------------------------------------------------- /mcp_email_server/emails/__init__.py: -------------------------------------------------------------------------------- ```python import abc from datetime import datetime from typing import TYPE_CHECKING if TYPE_CHECKING: from mcp_email_server.emails.models import EmailContentBatchResponse, EmailMetadataPageResponse class EmailHandler(abc.ABC): @abc.abstractmethod async def get_emails_metadata( self, page: int = 1, page_size: int = 10, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, order: str = "desc", ) -> "EmailMetadataPageResponse": """ Get email metadata only (without body content) for better performance """ @abc.abstractmethod async def get_emails_content(self, email_ids: list[str]) -> "EmailContentBatchResponse": """ Get full content (including body) of multiple emails by their email IDs (IMAP UIDs) """ @abc.abstractmethod async def send_email( self, recipients: list[str], subject: str, body: str, cc: list[str] | None = None, bcc: list[str] | None = None, html: bool = False, ) -> None: """ Send email """ ``` -------------------------------------------------------------------------------- /mkdocs.yml: -------------------------------------------------------------------------------- ```yaml site_name: mcp-email-server repo_url: https://github.com/ai-zerolab/mcp-email-server site_url: https://ai-zerolab.github.io/mcp-email-server site_description: IMAP and SMTP via MCP Server site_author: ai-zerolab edit_uri: edit/main/docs/ repo_name: ai-zerolab/mcp-email-server copyright: Maintained by <a href="https://ai-zerolab.com">ai-zerolab</a>. nav: - Home: index.md - Modules: modules.md plugins: - search - mkdocstrings: handlers: python: paths: ["mcp_email_server"] theme: name: material feature: tabs: true palette: - media: "(prefers-color-scheme: light)" scheme: default primary: white accent: deep orange toggle: icon: material/brightness-7 name: Switch to dark mode - media: "(prefers-color-scheme: dark)" scheme: slate primary: black accent: deep orange toggle: icon: material/brightness-4 name: Switch to light mode icon: repo: fontawesome/brands/github extra: social: - icon: fontawesome/brands/github link: https://github.com/ai-zerolab/mcp-email-server - icon: fontawesome/brands/python link: https://pypi.org/project/mcp-email-server markdown_extensions: - toc: permalink: true - pymdownx.arithmatex: generic: true ``` -------------------------------------------------------------------------------- /mcp_email_server/emails/models.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from typing import Any from pydantic import BaseModel class EmailMetadata(BaseModel): """Email metadata""" email_id: str subject: str sender: str recipients: list[str] # Recipient list date: datetime attachments: list[str] @classmethod def from_email(cls, email: dict[str, Any]): return cls( email_id=email["email_id"], subject=email["subject"], sender=email["from"], recipients=email.get("to", []), date=email["date"], attachments=email["attachments"], ) class EmailMetadataPageResponse(BaseModel): """Paged email metadata response""" page: int page_size: int before: datetime | None since: datetime | None subject: str | None emails: list[EmailMetadata] total: int class EmailBodyResponse(BaseModel): """Single email body response""" email_id: str # IMAP UID of this email subject: str sender: str recipients: list[str] date: datetime body: str attachments: list[str] class EmailContentBatchResponse(BaseModel): """Batch email content response for multiple emails""" emails: list[EmailBodyResponse] requested_count: int retrieved_count: int failed_ids: list[str] ``` -------------------------------------------------------------------------------- /.github/workflows/main.yml: -------------------------------------------------------------------------------- ```yaml name: Main on: push: branches: - main pull_request: types: [opened, synchronize, reopened, ready_for_review] jobs: quality: runs-on: ubuntu-latest steps: - name: Check out uses: actions/checkout@v4 - uses: actions/cache@v4 with: path: ~/.cache/pre-commit key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Run checks run: make check tests-and-type-check: runs-on: ubuntu-latest strategy: matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] fail-fast: false defaults: run: shell: bash steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env with: python-version: ${{ matrix.python-version }} - name: Run tests run: uv run python -m pytest tests --cov --cov-config=pyproject.toml --cov-report=xml - name: Upload coverage reports to Codecov with GitHub Action on Python 3.12 uses: codecov/codecov-action@v4 if: ${{ matrix.python-version == '3.12' }} check-docs: runs-on: ubuntu-latest steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Check if documentation can be built run: uv run mkdocs build -s ``` -------------------------------------------------------------------------------- /dev/install_claude_desktop.py: -------------------------------------------------------------------------------- ```python import json import os import platform from pathlib import Path from jinja2 import Template _HERE = Path(__file__).parent config_template = _HERE / "claude_desktop_config.json" def generate_claude_config(): # Get current working directory pwd = Path.cwd().resolve().as_posix() # Read the template config template_content = config_template.read_text() rendered_content = Template(template_content).render(PWD=pwd) template_config = json.loads(rendered_content) # Determine the correct Claude config path based on the OS system = platform.system() if system == "Darwin": config_path = os.path.expanduser("~/Library/Application Support/Claude/claude_desktop_config.json") elif system == "Windows": config_path = os.path.join(os.environ["APPDATA"], "Claude", "claude_desktop_config.json") else: print("Unsupported operating system.") return # Read the existing config file or create an empty JSON object try: with open(config_path) as f: existing_config = json.load(f) except FileNotFoundError: existing_config = {} # Merge the template config into the existing config if "mcpServers" not in existing_config: existing_config["mcpServers"] = {} existing_config["mcpServers"].update(template_config["mcpServers"]) # Write the merged config back to the Claude config file os.makedirs(os.path.dirname(config_path), exist_ok=True) with open(config_path, "w") as f: json.dump(existing_config, f, indent=4) print( f""" Claude Desktop configuration generated successfully. $cat {config_path} {json.dumps(existing_config, indent=4)} """ ) if __name__ == "__main__": generate_claude_config() ``` -------------------------------------------------------------------------------- /tests/test_config.py: -------------------------------------------------------------------------------- ```python import pytest from pydantic import ValidationError from mcp_email_server.config import ( EmailServer, EmailSettings, ProviderSettings, get_settings, store_settings, ) def test_config(): settings = get_settings() assert settings.emails == [] settings.emails.append( EmailSettings( account_name="email_test", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test", password="test", host="imap.gmail.com", port=993, ssl=True, ), outgoing=EmailServer( user_name="test", password="test", host="smtp.gmail.com", port=587, ssl=True, ), ) ) settings.providers.append(ProviderSettings(account_name="provider_test", provider_name="test", api_key="test")) store_settings(settings) reloaded_settings = get_settings(reload=True) assert reloaded_settings == settings with pytest.raises(ValidationError): settings.add_email( EmailSettings( account_name="email_test", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test", password="test", host="imap.gmail.com", port=993, ssl=True, ), outgoing=EmailServer( user_name="test", password="test", host="smtp.gmail.com", port=587, ssl=True, ), ) ) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-email-server" version = "0.0.1" description = "IMAP and SMTP via MCP Server" authors = [{ name = "ai-zerolab", email = "[email protected]" }] readme = "README.md" keywords = ["MCP", "IMAP", "SMTP", "email"] requires-python = ">=3.10,<4.0" classifiers = [ "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ "aioimaplib>=2.0.1", "aiosmtplib>=4.0.0", "gradio>=5.18.0", "jinja2>=3.1.5", "loguru>=0.7.3", "mcp[cli]>=1.3.0", "pydantic>=2.10.6", "pydantic-settings[toml]>=2.8.0", "tomli-w>=1.2.0", "typer>=0.15.1", ] [project.scripts] mcp-email-server = "mcp_email_server.cli:app" [project.urls] Homepage = "https://ai-zerolab.github.io/mcp-email-server/" Repository = "https://github.com/ai-zerolab/mcp-email-server" Documentation = "https://ai-zerolab.github.io/mcp-email-server/" [dependency-groups] dev = [ "pytest>=7.2.0", "pytest-asyncio>=0.25.3", "pre-commit>=2.20.0", "tox-uv>=1.11.3", "deptry>=0.22.0", "pytest-cov>=4.0.0", "ruff>=0.9.2", "mkdocs>=1.4.2", "mkdocs-material>=8.5.10", "mkdocstrings[python]>=0.26.1", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.setuptools] py-modules = ["mcp_email_server"] [tool.pytest.ini_options] testpaths = ["tests"] [tool.ruff] target-version = "py310" line-length = 120 fix = true [tool.ruff.lint] select = [ # flake8-2020 "YTT", # flake8-bandit "S", # flake8-bugbear "B", # flake8-builtins "A", # flake8-comprehensions "C4", # flake8-debugger "T10", # flake8-simplify "SIM", # isort "I", # mccabe "C90", # pycodestyle "E", "W", # pyflakes "F", # pygrep-hooks "PGH", # pyupgrade "UP", # ruff "RUF", # tryceratops "TRY", ] ignore = [ # LineTooLong "E501", # DoNotAssignLambda "E731", # raise-vanilla-args "TRY003", # try-consider-else "TRY300", ] [tool.ruff.lint.per-file-ignores] "tests/*" = ["S101", "S106", "SIM117"] [tool.ruff.format] preview = true [tool.coverage.report] skip_empty = true [tool.coverage.run] branch = true source = ["mcp_email_server"] ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations import asyncio import os from datetime import datetime from pathlib import Path from unittest.mock import AsyncMock import pytest from mcp_email_server.config import EmailServer, EmailSettings, ProviderSettings, delete_settings _HERE = Path(__file__).resolve().parent os.environ["MCP_EMAIL_SERVER_CONFIG_PATH"] = (_HERE / "config.toml").as_posix() os.environ["MCP_EMAIL_SERVER_LOG_LEVEL"] = "DEBUG" @pytest.fixture(autouse=True) def patch_env(monkeypatch: pytest.MonkeyPatch, tmp_path: pytest.TempPathFactory): delete_settings() yield @pytest.fixture def email_server(): """Fixture for a test EmailServer.""" return EmailServer( user_name="test_user", password="test_password", host="test.example.com", port=993, use_ssl=True, ) @pytest.fixture def email_settings(): """Fixture for test EmailSettings.""" return EmailSettings( account_name="test_account", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test_user", password="test_password", host="imap.example.com", port=993, use_ssl=True, ), outgoing=EmailServer( user_name="test_user", password="test_password", host="smtp.example.com", port=465, use_ssl=True, ), ) @pytest.fixture def provider_settings(): """Fixture for test ProviderSettings.""" return ProviderSettings( account_name="test_provider", provider_name="test_provider", api_key="test_api_key", ) @pytest.fixture def mock_imap(): """Fixture for a mocked IMAP client.""" mock_imap = AsyncMock() mock_imap._client_task = asyncio.Future() mock_imap._client_task.set_result(None) mock_imap.wait_hello_from_server = AsyncMock() mock_imap.login = AsyncMock() mock_imap.select = AsyncMock() mock_imap.search = AsyncMock(return_value=(None, [b"1 2 3"])) mock_imap.fetch = AsyncMock(return_value=(None, [b"HEADER", bytearray(b"EMAIL CONTENT")])) mock_imap.logout = AsyncMock() return mock_imap @pytest.fixture def mock_smtp(): """Fixture for a mocked SMTP client.""" mock_smtp = AsyncMock() mock_smtp.__aenter__.return_value = mock_smtp mock_smtp.__aexit__.return_value = None mock_smtp.login = AsyncMock() mock_smtp.send_message = AsyncMock() return mock_smtp @pytest.fixture def sample_email_data(): """Fixture for sample email data.""" now = datetime.now() return { "subject": "Test Subject", "from": "[email protected]", "body": "Test Body", "date": now, "attachments": ["attachment.pdf"], } ``` -------------------------------------------------------------------------------- /tests/test_models.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from mcp_email_server.emails.models import EmailMetadata, EmailMetadataPageResponse class TestEmailMetadata: def test_init(self): """Test initialization with valid data.""" email_data = EmailMetadata( email_id="123", subject="Test Subject", sender="[email protected]", recipients=["[email protected]"], date=datetime.now(), attachments=["file1.txt", "file2.pdf"], ) assert email_data.subject == "Test Subject" assert email_data.sender == "[email protected]" assert email_data.recipients == ["[email protected]"] assert isinstance(email_data.date, datetime) assert email_data.attachments == ["file1.txt", "file2.pdf"] def test_from_email(self): """Test from_email class method.""" now = datetime.now() email_dict = { "email_id": "123", "subject": "Test Subject", "from": "[email protected]", "to": ["[email protected]"], "date": now, "attachments": ["file1.txt", "file2.pdf"], } email_data = EmailMetadata.from_email(email_dict) assert email_data.subject == "Test Subject" assert email_data.sender == "[email protected]" assert email_data.recipients == ["[email protected]"] assert email_data.date == now assert email_data.attachments == ["file1.txt", "file2.pdf"] class TestEmailMetadataPageResponse: def test_init(self): """Test initialization with valid data.""" now = datetime.now() email_data = EmailMetadata( email_id="123", subject="Test Subject", sender="[email protected]", recipients=["[email protected]"], date=now, attachments=[], ) response = EmailMetadataPageResponse( page=1, page_size=10, before=now, since=None, subject="Test", emails=[email_data], total=1, ) assert response.page == 1 assert response.page_size == 10 assert response.before == now assert response.since is None assert response.subject == "Test" assert len(response.emails) == 1 assert response.emails[0] == email_data assert response.total == 1 def test_empty_emails(self): """Test with empty email list.""" response = EmailMetadataPageResponse( page=1, page_size=10, before=None, since=None, subject=None, emails=[], total=0, ) assert response.page == 1 assert response.page_size == 10 assert len(response.emails) == 0 assert response.total == 0 ``` -------------------------------------------------------------------------------- /.github/workflows/on-release-main.yml: -------------------------------------------------------------------------------- ```yaml name: release-main permissions: contents: write packages: write on: release: types: [published] jobs: set-version: runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - name: Export tag id: vars run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT if: ${{ github.event_name == 'release' }} - name: Update project version run: | sed -i "s/^version = \".*\"/version = \"$RELEASE_VERSION\"/" pyproject.toml env: RELEASE_VERSION: ${{ steps.vars.outputs.tag }} if: ${{ github.event_name == 'release' }} - name: Upload updated pyproject.toml uses: actions/upload-artifact@v4 with: name: pyproject-toml path: pyproject.toml publish: runs-on: ubuntu-latest needs: [set-version] steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Download updated pyproject.toml uses: actions/download-artifact@v4 with: name: pyproject-toml - name: Build package run: uv build - name: Publish package run: uv publish env: UV_PUBLISH_TOKEN: ${{ secrets.PYPI_TOKEN }} - name: Upload dists to release uses: svenstaro/upload-release-action@v2 with: repo_token: ${{ secrets.GITHUB_TOKEN }} file: dist/* file_glob: true tag: ${{ github.ref }} overwrite: true push-image: runs-on: ubuntu-latest needs: [set-version] steps: - name: Check out uses: actions/checkout@v4 - name: Export tag id: vars run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT if: ${{ github.event_name == 'release' }} - name: Set up QEMU uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Login to Github Container Registry uses: docker/login-action@v3 with: registry: ghcr.io username: ai-zerolab password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push image id: docker_build_publish uses: docker/build-push-action@v5 with: context: . platforms: linux/amd64,linux/arm64/v8 cache-from: type=gha cache-to: type=gha,mode=max file: ./Dockerfile push: true tags: | ghcr.io/ai-zerolab/mcp-email-server:${{ steps.vars.outputs.tag }} ghcr.io/ai-zerolab/mcp-email-server:latest deploy-docs: needs: publish runs-on: ubuntu-latest steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Deploy documentation run: uv run mkdocs gh-deploy --force ``` -------------------------------------------------------------------------------- /tests/test_dispatcher.py: -------------------------------------------------------------------------------- ```python from unittest.mock import MagicMock, patch import pytest from mcp_email_server.config import EmailServer, EmailSettings, ProviderSettings from mcp_email_server.emails.classic import ClassicEmailHandler from mcp_email_server.emails.dispatcher import dispatch_handler class TestDispatcher: def test_dispatch_handler_with_email_settings(self): """Test dispatch_handler with valid email account.""" # Create test email settings email_settings = EmailSettings( account_name="test_account", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test_user", password="test_password", host="imap.example.com", port=993, use_ssl=True, ), outgoing=EmailServer( user_name="test_user", password="test_password", host="smtp.example.com", port=465, use_ssl=True, ), ) # Mock the get_settings function to return our settings mock_settings = MagicMock() mock_settings.get_account.return_value = email_settings with patch("mcp_email_server.emails.dispatcher.get_settings", return_value=mock_settings): # Call the function handler = dispatch_handler("test_account") # Verify the result assert isinstance(handler, ClassicEmailHandler) assert handler.email_settings == email_settings # Verify get_account was called correctly mock_settings.get_account.assert_called_once_with("test_account") def test_dispatch_handler_with_provider_settings(self): """Test dispatch_handler with provider account (should raise NotImplementedError).""" # Create test provider settings provider_settings = ProviderSettings( account_name="test_provider", provider_name="test", api_key="test_key", ) # Mock the get_settings function to return our settings mock_settings = MagicMock() mock_settings.get_account.return_value = provider_settings with patch("mcp_email_server.emails.dispatcher.get_settings", return_value=mock_settings): # Call the function and expect NotImplementedError with pytest.raises(NotImplementedError): dispatch_handler("test_provider") # Verify get_account was called correctly mock_settings.get_account.assert_called_once_with("test_provider") def test_dispatch_handler_with_nonexistent_account(self): """Test dispatch_handler with non-existent account (should raise ValueError).""" # Mock the get_settings function to return None for get_account mock_settings = MagicMock() mock_settings.get_account.return_value = None mock_settings.get_accounts.return_value = ["account1", "account2"] with patch("mcp_email_server.emails.dispatcher.get_settings", return_value=mock_settings): # Call the function and expect ValueError with pytest.raises(ValueError) as excinfo: dispatch_handler("nonexistent_account") # Verify the error message assert "Account nonexistent_account not found" in str(excinfo.value) # Verify get_account was called correctly mock_settings.get_account.assert_called_once_with("nonexistent_account") mock_settings.get_accounts.assert_called_once() ``` -------------------------------------------------------------------------------- /mcp_email_server/app.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from typing import Annotated, Literal from mcp.server.fastmcp import FastMCP from pydantic import Field from mcp_email_server.config import ( AccountAttributes, EmailSettings, ProviderSettings, get_settings, ) from mcp_email_server.emails.dispatcher import dispatch_handler from mcp_email_server.emails.models import EmailContentBatchResponse, EmailMetadataPageResponse mcp = FastMCP("email") @mcp.resource("email://{account_name}") async def get_account(account_name: str) -> EmailSettings | ProviderSettings | None: settings = get_settings() return settings.get_account(account_name, masked=True) @mcp.tool(description="List all configured email accounts with masked credentials.") async def list_available_accounts() -> list[AccountAttributes]: settings = get_settings() return [account.masked() for account in settings.get_accounts()] @mcp.tool(description="Add a new email account configuration to the settings.") async def add_email_account(email: EmailSettings) -> str: settings = get_settings() settings.add_email(email) settings.store() return f"Successfully added email account '{email.account_name}'" @mcp.tool( description="List email metadata (email_id, subject, sender, recipients, date) without body content. Returns email_id for use with get_emails_content." ) async def list_emails_metadata( account_name: Annotated[str, Field(description="The name of the email account.")], page: Annotated[ int, Field(default=1, description="The page number to retrieve (starting from 1)."), ] = 1, page_size: Annotated[int, Field(default=10, description="The number of emails to retrieve per page.")] = 10, before: Annotated[ datetime | None, Field(default=None, description="Retrieve emails before this datetime (UTC)."), ] = None, since: Annotated[ datetime | None, Field(default=None, description="Retrieve emails since this datetime (UTC)."), ] = None, subject: Annotated[str | None, Field(default=None, description="Filter emails by subject.")] = None, from_address: Annotated[str | None, Field(default=None, description="Filter emails by sender address.")] = None, to_address: Annotated[ str | None, Field(default=None, description="Filter emails by recipient address."), ] = None, order: Annotated[ Literal["asc", "desc"], Field(default=None, description="Order emails by field. `asc` or `desc`."), ] = "desc", ) -> EmailMetadataPageResponse: handler = dispatch_handler(account_name) return await handler.get_emails_metadata( page=page, page_size=page_size, before=before, since=since, subject=subject, from_address=from_address, to_address=to_address, order=order, ) @mcp.tool( description="Get the full content (including body) of one or more emails by their email_id. Use list_emails_metadata first to get the email_id." ) async def get_emails_content( account_name: Annotated[str, Field(description="The name of the email account.")], email_ids: Annotated[ list[str], Field( description="List of email_id to retrieve (obtained from list_emails_metadata). Can be a single email_id or multiple email_ids." ), ], ) -> EmailContentBatchResponse: handler = dispatch_handler(account_name) return await handler.get_emails_content(email_ids) @mcp.tool( description="Send an email using the specified account. Recipient should be a list of email addresses.", ) async def send_email( account_name: Annotated[str, Field(description="The name of the email account to send from.")], recipients: Annotated[list[str], Field(description="A list of recipient email addresses.")], subject: Annotated[str, Field(description="The subject of the email.")], body: Annotated[str, Field(description="The body of the email.")], cc: Annotated[ list[str] | None, Field(default=None, description="A list of CC email addresses."), ] = None, bcc: Annotated[ list[str] | None, Field(default=None, description="A list of BCC email addresses."), ] = None, html: Annotated[ bool, Field(default=False, description="Whether to send the email as HTML (True) or plain text (False)."), ] = False, ) -> str: handler = dispatch_handler(account_name) await handler.send_email(recipients, subject, body, cc, bcc, html) recipient_str = ", ".join(recipients) return f"Email sent successfully to {recipient_str}" ``` -------------------------------------------------------------------------------- /mcp_email_server/tools/installer.py: -------------------------------------------------------------------------------- ```python import json import os import platform import shutil import sys from pathlib import Path from jinja2 import Template _HERE = Path(__file__).parent CLAUDE_DESKTOP_CONFIG_TEMPLATE = _HERE / "claude_desktop_config.json" system = platform.system() if system == "Darwin": CLAUDE_DESKTOP_CONFIG_PATH = os.path.expanduser("~/Library/Application Support/Claude/claude_desktop_config.json") elif system == "Windows": CLAUDE_DESKTOP_CONFIG_PATH = os.path.join(os.environ["APPDATA"], "Claude", "claude_desktop_config.json") else: CLAUDE_DESKTOP_CONFIG_PATH = None def get_endpoint_path() -> str: """ Find the path to the mcp-email-server script. Similar to the 'which' command in Unix-like systems. Returns: str: The full path to the mcp-email-server script """ # First try using shutil.which to find the script in PATH script_path = shutil.which("mcp-email-server") if script_path: return script_path # If not found in PATH, try to find it in the current Python environment # This handles cases where the script is installed but not in PATH bin_dir = Path(sys.executable).parent possible_paths = [ bin_dir / "mcp-email-server", bin_dir / "mcp-email-server.exe", # For Windows ] for path in possible_paths: if path.exists(): return str(path) # If we can't find it, return the script name and hope it's in PATH when executed return "mcp-email-server" def install_claude_desktop(): # Read the template config template_content = CLAUDE_DESKTOP_CONFIG_TEMPLATE.read_text() rendered_content = Template(template_content).render(ENTRYPOINT=get_endpoint_path()) template_config = json.loads(rendered_content) if not CLAUDE_DESKTOP_CONFIG_PATH: raise NotImplementedError # Read the existing config file or create an empty JSON object try: with open(CLAUDE_DESKTOP_CONFIG_PATH) as f: existing_config = json.load(f) except FileNotFoundError: existing_config = {} # Merge the template config into the existing config if "mcpServers" not in existing_config: existing_config["mcpServers"] = {} existing_config["mcpServers"].update(template_config["mcpServers"]) # Write the merged config back to the Claude config file os.makedirs(os.path.dirname(CLAUDE_DESKTOP_CONFIG_PATH), exist_ok=True) with open(CLAUDE_DESKTOP_CONFIG_PATH, "w") as f: json.dump(existing_config, f, indent=4) def uninstall_claude_desktop(): if not CLAUDE_DESKTOP_CONFIG_PATH: raise NotImplementedError try: with open(CLAUDE_DESKTOP_CONFIG_PATH) as f: existing_config = json.load(f) except FileNotFoundError: return if "mcpServers" not in existing_config: return if "zerolib-email" in existing_config["mcpServers"]: del existing_config["mcpServers"]["zerolib-email"] with open(CLAUDE_DESKTOP_CONFIG_PATH, "w") as f: json.dump(existing_config, f, indent=4) def is_installed() -> bool: """ Check if the MCP email server is installed in the Claude desktop configuration. Returns: bool: True if installed, False otherwise """ if not CLAUDE_DESKTOP_CONFIG_PATH: return False try: with open(CLAUDE_DESKTOP_CONFIG_PATH) as f: config = json.load(f) return "mcpServers" in config and "zerolib-email" in config["mcpServers"] except (FileNotFoundError, json.JSONDecodeError): return False def need_update() -> bool: """ Check if the installed configuration needs to be updated. Returns: bool: True if an update is needed, False otherwise """ if not is_installed(): return True try: # Get the template config template_content = CLAUDE_DESKTOP_CONFIG_TEMPLATE.read_text() rendered_content = Template(template_content).render(ENTRYPOINT=get_endpoint_path()) template_config = json.loads(rendered_content) # Get the installed config with open(CLAUDE_DESKTOP_CONFIG_PATH) as f: installed_config = json.load(f) # Compare the relevant parts of the configs template_server = template_config["mcpServers"]["zerolib-email"] installed_server = installed_config["mcpServers"]["zerolib-email"] # Check if any key configuration elements differ return ( template_server.get("command") != installed_server.get("command") or template_server.get("args") != installed_server.get("args") or template_server.get("env") != installed_server.get("env") ) except (FileNotFoundError, json.JSONDecodeError, KeyError): # If any error occurs during comparison, suggest an update return True def get_claude_desktop_config() -> str: if not CLAUDE_DESKTOP_CONFIG_PATH: raise NotImplementedError with open(CLAUDE_DESKTOP_CONFIG_PATH) as f: return f.read() ``` -------------------------------------------------------------------------------- /tests/test_classic_handler.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from unittest.mock import AsyncMock, patch import pytest from mcp_email_server.config import EmailServer, EmailSettings from mcp_email_server.emails.classic import ClassicEmailHandler, EmailClient from mcp_email_server.emails.models import EmailMetadata, EmailMetadataPageResponse @pytest.fixture def email_settings(): return EmailSettings( account_name="test_account", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test_user", password="test_password", host="imap.example.com", port=993, use_ssl=True, ), outgoing=EmailServer( user_name="test_user", password="test_password", host="smtp.example.com", port=465, use_ssl=True, ), ) @pytest.fixture def classic_handler(email_settings): return ClassicEmailHandler(email_settings) class TestClassicEmailHandler: def test_init(self, email_settings): """Test initialization of ClassicEmailHandler.""" handler = ClassicEmailHandler(email_settings) assert handler.email_settings == email_settings assert isinstance(handler.incoming_client, EmailClient) assert isinstance(handler.outgoing_client, EmailClient) # Check that clients are initialized correctly assert handler.incoming_client.email_server == email_settings.incoming assert handler.outgoing_client.email_server == email_settings.outgoing assert handler.outgoing_client.sender == f"{email_settings.full_name} <{email_settings.email_address}>" @pytest.mark.asyncio async def test_get_emails(self, classic_handler): """Test get_emails method.""" # Create test data now = datetime.now() email_data = { "email_id": "123", "subject": "Test Subject", "from": "[email protected]", "to": ["[email protected]"], "date": now, "attachments": [], } # Mock the get_emails_stream method to yield our test data mock_stream = AsyncMock() mock_stream.__aiter__.return_value = [email_data] # Mock the get_email_count method mock_count = AsyncMock(return_value=1) # Apply the mocks with patch.object(classic_handler.incoming_client, "get_emails_metadata_stream", return_value=mock_stream): with patch.object(classic_handler.incoming_client, "get_email_count", mock_count): # Call the method result = await classic_handler.get_emails_metadata( page=1, page_size=10, before=now, since=None, subject="Test", from_address="[email protected]", to_address=None, ) # Verify the result assert isinstance(result, EmailMetadataPageResponse) assert result.page == 1 assert result.page_size == 10 assert result.before == now assert result.since is None assert result.subject == "Test" assert len(result.emails) == 1 assert isinstance(result.emails[0], EmailMetadata) assert result.emails[0].subject == "Test Subject" assert result.emails[0].sender == "[email protected]" assert result.emails[0].date == now assert result.emails[0].attachments == [] assert result.total == 1 # Verify the client methods were called correctly classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with( 1, 10, now, None, "Test", "[email protected]", None, "desc" ) mock_count.assert_called_once_with( now, None, "Test", from_address="[email protected]", to_address=None ) @pytest.mark.asyncio async def test_send_email(self, classic_handler): """Test send_email method.""" # Mock the outgoing_client.send_email method mock_send = AsyncMock() # Apply the mock with patch.object(classic_handler.outgoing_client, "send_email", mock_send): # Call the method await classic_handler.send_email( recipients=["[email protected]"], subject="Test Subject", body="Test Body", cc=["[email protected]"], bcc=["[email protected]"], ) # Verify the client method was called correctly mock_send.assert_called_once_with( ["[email protected]"], "Test Subject", "Test Body", ["[email protected]"], ["[email protected]"], False, ) ``` -------------------------------------------------------------------------------- /tests/test_mcp_tools.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcp_email_server.app import ( add_email_account, get_emails_content, list_available_accounts, list_emails_metadata, send_email, ) from mcp_email_server.config import EmailServer, EmailSettings, ProviderSettings from mcp_email_server.emails.models import ( EmailBodyResponse, EmailContentBatchResponse, EmailMetadata, EmailMetadataPageResponse, ) class TestMcpTools: @pytest.mark.asyncio async def test_list_available_accounts(self): """Test list_available_accounts MCP tool.""" # Create test accounts email_settings = EmailSettings( account_name="test_email", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test_user", password="test_password", host="imap.example.com", port=993, use_ssl=True, ), outgoing=EmailServer( user_name="test_user", password="test_password", host="smtp.example.com", port=465, use_ssl=True, ), ) provider_settings = ProviderSettings( account_name="test_provider", provider_name="test", api_key="test_key", ) # Mock the get_settings function mock_settings = MagicMock() mock_settings.get_accounts.return_value = [email_settings, provider_settings] with patch("mcp_email_server.app.get_settings", return_value=mock_settings): # Call the function result = await list_available_accounts() # Verify the result assert len(result) == 2 assert result[0].account_name == "test_email" assert result[1].account_name == "test_provider" # Verify get_accounts was called correctly mock_settings.get_accounts.assert_called_once() @pytest.mark.asyncio async def test_add_email_account(self): """Test add_email_account MCP tool.""" # Create test email settings email_settings = EmailSettings( account_name="test_account", full_name="Test User", email_address="[email protected]", incoming=EmailServer( user_name="test_user", password="test_password", host="imap.example.com", port=993, use_ssl=True, ), outgoing=EmailServer( user_name="test_user", password="test_password", host="smtp.example.com", port=465, use_ssl=True, ), ) # Mock the get_settings function mock_settings = MagicMock() with patch("mcp_email_server.app.get_settings", return_value=mock_settings): # Call the function result = await add_email_account(email_settings) # Verify the return value assert result == "Successfully added email account 'test_account'" # Verify add_email and store were called correctly mock_settings.add_email.assert_called_once_with(email_settings) mock_settings.store.assert_called_once() @pytest.mark.asyncio async def test_list_emails_metadata(self): """Test list_emails_metadata MCP tool.""" # Create test data now = datetime.now() email_metadata = EmailMetadata( email_id="12345", subject="Test Subject", sender="[email protected]", recipients=["[email protected]"], date=now, attachments=[], ) email_metadata_page = EmailMetadataPageResponse( page=1, page_size=10, before=now, since=None, subject="Test", emails=[email_metadata], total=1, ) # Mock the dispatch_handler function mock_handler = AsyncMock() mock_handler.get_emails_metadata.return_value = email_metadata_page with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): # Call the function result = await list_emails_metadata( account_name="test_account", page=1, page_size=10, before=now, since=None, subject="Test", from_address="[email protected]", to_address=None, ) # Verify the result assert result == email_metadata_page assert result.page == 1 assert result.page_size == 10 assert result.before == now assert result.subject == "Test" assert len(result.emails) == 1 assert result.emails[0].subject == "Test Subject" assert result.emails[0].email_id == "12345" # Verify dispatch_handler and get_emails_metadata were called correctly mock_handler.get_emails_metadata.assert_called_once_with( page=1, page_size=10, before=now, since=None, subject="Test", from_address="[email protected]", to_address=None, order="desc", ) @pytest.mark.asyncio async def test_get_emails_content_single(self): """Test get_emails_content MCP tool with single email.""" # Create test data now = datetime.now() email_body = EmailBodyResponse( email_id="12345", subject="Test Subject", sender="[email protected]", recipients=["[email protected]"], date=now, body="This is the test email body content.", attachments=["attachment1.pdf"], ) batch_response = EmailContentBatchResponse( emails=[email_body], requested_count=1, retrieved_count=1, failed_ids=[], ) # Mock the dispatch_handler function mock_handler = AsyncMock() mock_handler.get_emails_content.return_value = batch_response with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): # Call the function result = await get_emails_content( account_name="test_account", email_ids=["12345"], ) # Verify the result assert result == batch_response assert result.requested_count == 1 assert result.retrieved_count == 1 assert len(result.failed_ids) == 0 assert len(result.emails) == 1 assert result.emails[0].email_id == "12345" assert result.emails[0].subject == "Test Subject" # Verify dispatch_handler and get_emails_content were called correctly mock_handler.get_emails_content.assert_called_once_with(["12345"]) @pytest.mark.asyncio async def test_get_emails_content_batch(self): """Test get_emails_content MCP tool with multiple emails.""" # Create test data now = datetime.now() email1 = EmailBodyResponse( email_id="12345", subject="Test Subject 1", sender="[email protected]", recipients=["[email protected]"], date=now, body="This is the first test email body content.", attachments=[], ) email2 = EmailBodyResponse( email_id="12346", subject="Test Subject 2", sender="[email protected]", recipients=["[email protected]"], date=now, body="This is the second test email body content.", attachments=["attachment1.pdf"], ) batch_response = EmailContentBatchResponse( emails=[email1, email2], requested_count=3, retrieved_count=2, failed_ids=["12347"], ) # Mock the dispatch_handler function mock_handler = AsyncMock() mock_handler.get_emails_content.return_value = batch_response with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): # Call the function result = await get_emails_content( account_name="test_account", email_ids=["12345", "12346", "12347"], ) # Verify the result assert result == batch_response assert result.requested_count == 3 assert result.retrieved_count == 2 assert len(result.failed_ids) == 1 assert result.failed_ids[0] == "12347" assert len(result.emails) == 2 assert result.emails[0].email_id == "12345" assert result.emails[1].email_id == "12346" # Verify dispatch_handler and get_emails_content were called correctly mock_handler.get_emails_content.assert_called_once_with(["12345", "12346", "12347"]) @pytest.mark.asyncio async def test_send_email(self): """Test send_email MCP tool.""" # Mock the dispatch_handler function mock_handler = AsyncMock() with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): # Call the function result = await send_email( account_name="test_account", recipients=["[email protected]"], subject="Test Subject", body="Test Body", cc=["[email protected]"], bcc=["[email protected]"], ) # Verify the return value assert result == "Email sent successfully to [email protected]" # Verify send_email was called correctly mock_handler.send_email.assert_called_once_with( ["[email protected]"], "Test Subject", "Test Body", ["[email protected]"], ["[email protected]"], False, ) ``` -------------------------------------------------------------------------------- /tests/test_email_client.py: -------------------------------------------------------------------------------- ```python import asyncio import email from datetime import datetime from email.mime.text import MIMEText from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcp_email_server.config import EmailServer from mcp_email_server.emails.classic import EmailClient @pytest.fixture def email_server(): return EmailServer( user_name="test_user", password="test_password", host="imap.example.com", port=993, use_ssl=True, ) @pytest.fixture def email_client(email_server): return EmailClient(email_server, sender="Test User <[email protected]>") class TestEmailClient: def test_init(self, email_server): """Test initialization of EmailClient.""" client = EmailClient(email_server) assert client.email_server == email_server assert client.sender == email_server.user_name assert client.smtp_use_tls is True assert client.smtp_start_tls is False # Test with custom sender custom_sender = "Custom <[email protected]>" client = EmailClient(email_server, sender=custom_sender) assert client.sender == custom_sender def test_parse_email_data_plain(self): """Test parsing plain text email.""" # Create a simple plain text email msg = MIMEText("This is a test email body") msg["Subject"] = "Test Subject" msg["From"] = "[email protected]" msg["To"] = "[email protected]" msg["Date"] = email.utils.formatdate() raw_email = msg.as_bytes() client = EmailClient(MagicMock()) result = client._parse_email_data(raw_email) assert result["subject"] == "Test Subject" assert result["from"] == "[email protected]" assert result["body"] == "This is a test email body" assert isinstance(result["date"], datetime) assert result["attachments"] == [] def test_parse_email_data_with_attachments(self): """Test parsing email with attachments.""" # This would require creating a multipart email with attachments # For simplicity, we'll mock the email parsing with patch("email.parser.BytesParser.parsebytes") as mock_parse: mock_email = MagicMock() mock_email.get.side_effect = lambda x, default=None: { "Subject": "Test Subject", "From": "[email protected]", "Date": email.utils.formatdate(), }.get(x, default) mock_email.is_multipart.return_value = True # Mock parts text_part = MagicMock() text_part.get_content_type.return_value = "text/plain" text_part.get.return_value = "" # Not an attachment text_part.get_payload.return_value = b"This is the email body" text_part.get_content_charset.return_value = "utf-8" attachment_part = MagicMock() attachment_part.get_content_type.return_value = "application/pdf" attachment_part.get.return_value = "attachment; filename=test.pdf" attachment_part.get_filename.return_value = "test.pdf" mock_email.walk.return_value = [text_part, attachment_part] mock_parse.return_value = mock_email client = EmailClient(MagicMock()) result = client._parse_email_data(b"dummy email content") assert result["subject"] == "Test Subject" assert result["from"] == "[email protected]" assert result["body"] == "This is the email body" assert isinstance(result["date"], datetime) assert result["attachments"] == ["test.pdf"] def test_build_search_criteria(self): """Test building search criteria for IMAP.""" # Test with no criteria (should return ["ALL"]) criteria = EmailClient._build_search_criteria() assert criteria == ["ALL"] # Test with before date before_date = datetime(2023, 1, 1) criteria = EmailClient._build_search_criteria(before=before_date) assert criteria == ["BEFORE", "01-JAN-2023"] # Test with since date since_date = datetime(2023, 1, 1) criteria = EmailClient._build_search_criteria(since=since_date) assert criteria == ["SINCE", "01-JAN-2023"] # Test with subject criteria = EmailClient._build_search_criteria(subject="Test") assert criteria == ["SUBJECT", "Test"] # Test with body criteria = EmailClient._build_search_criteria(body="Test") assert criteria == ["BODY", "Test"] # Test with text criteria = EmailClient._build_search_criteria(text="Test") assert criteria == ["TEXT", "Test"] # Test with from_address criteria = EmailClient._build_search_criteria(from_address="[email protected]") assert criteria == ["FROM", "[email protected]"] # Test with to_address criteria = EmailClient._build_search_criteria(to_address="[email protected]") assert criteria == ["TO", "[email protected]"] # Test with multiple criteria criteria = EmailClient._build_search_criteria( subject="Test", from_address="[email protected]", since=datetime(2023, 1, 1) ) assert criteria == ["SINCE", "01-JAN-2023", "SUBJECT", "Test", "FROM", "[email protected]"] @pytest.mark.asyncio async def test_get_emails_stream(self, email_client): """Test getting emails stream.""" # Mock IMAP client mock_imap = AsyncMock() mock_imap._client_task = asyncio.Future() mock_imap._client_task.set_result(None) mock_imap.wait_hello_from_server = AsyncMock() mock_imap.login = AsyncMock() mock_imap.select = AsyncMock() mock_imap.search = AsyncMock(return_value=(None, [b"1 2 3"])) mock_imap.uid_search = AsyncMock(return_value=(None, [b"1 2 3"])) mock_imap.fetch = AsyncMock(return_value=(None, [b"HEADER", bytearray(b"EMAIL CONTENT")])) # Create a simple email with headers for testing test_email = b"""From: [email protected]\r To: [email protected]\r Subject: Test Subject\r Date: Mon, 1 Jan 2024 00:00:00 +0000\r \r This is the email body.""" mock_imap.uid = AsyncMock( return_value=(None, [b"1 FETCH (UID 1 RFC822 {%d}" % len(test_email), bytearray(test_email)]) ) mock_imap.logout = AsyncMock() # Mock IMAP class with patch.object(email_client, "imap_class", return_value=mock_imap): # Mock _parse_email_data with patch.object(email_client, "_parse_email_data") as mock_parse: mock_parse.return_value = { "subject": "Test Subject", "from": "[email protected]", "body": "Test Body", "date": datetime.now(), "attachments": [], } emails = [] async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): emails.append(email_data) # We should get 3 emails (from the mocked search result "1 2 3") assert len(emails) == 3 assert emails[0]["subject"] == "Test Subject" assert emails[0]["from"] == "[email protected]" # Verify IMAP methods were called correctly mock_imap.login.assert_called_once_with( email_client.email_server.user_name, email_client.email_server.password ) mock_imap.select.assert_called_once_with("INBOX") mock_imap.uid_search.assert_called_once_with("ALL") assert mock_imap.uid.call_count == 3 mock_imap.logout.assert_called_once() @pytest.mark.asyncio async def test_get_email_count(self, email_client): """Test getting email count.""" # Mock IMAP client mock_imap = AsyncMock() mock_imap._client_task = asyncio.Future() mock_imap._client_task.set_result(None) mock_imap.wait_hello_from_server = AsyncMock() mock_imap.login = AsyncMock() mock_imap.select = AsyncMock() mock_imap.search = AsyncMock(return_value=(None, [b"1 2 3 4 5"])) mock_imap.uid_search = AsyncMock(return_value=(None, [b"1 2 3 4 5"])) mock_imap.logout = AsyncMock() # Mock IMAP class with patch.object(email_client, "imap_class", return_value=mock_imap): count = await email_client.get_email_count() assert count == 5 # Verify IMAP methods were called correctly mock_imap.login.assert_called_once_with( email_client.email_server.user_name, email_client.email_server.password ) mock_imap.select.assert_called_once_with("INBOX") mock_imap.uid_search.assert_called_once_with("ALL") mock_imap.logout.assert_called_once() @pytest.mark.asyncio async def test_send_email(self, email_client): """Test sending email.""" # Mock SMTP client mock_smtp = AsyncMock() mock_smtp.__aenter__.return_value = mock_smtp mock_smtp.__aexit__.return_value = None mock_smtp.login = AsyncMock() mock_smtp.send_message = AsyncMock() with patch("aiosmtplib.SMTP", return_value=mock_smtp): await email_client.send_email( recipients=["[email protected]"], subject="Test Subject", body="Test Body", cc=["[email protected]"], bcc=["[email protected]"], ) # Verify SMTP methods were called correctly mock_smtp.login.assert_called_once_with( email_client.email_server.user_name, email_client.email_server.password ) mock_smtp.send_message.assert_called_once() # Check that the message was constructed correctly call_args = mock_smtp.send_message.call_args msg = call_args[0][0] recipients = call_args[1]["recipients"] assert msg["Subject"] == "Test Subject" assert msg["From"] == email_client.sender assert msg["To"] == "[email protected]" assert msg["Cc"] == "[email protected]" assert "Bcc" not in msg # BCC should not be in headers # Check that all recipients are included in the SMTP call assert "[email protected]" in recipients assert "[email protected]" in recipients assert "[email protected]" in recipients ``` -------------------------------------------------------------------------------- /tests/test_env_config_coverage.py: -------------------------------------------------------------------------------- ```python """Focused tests to achieve patch coverage for environment variable configuration.""" import os from mcp_email_server.config import EmailSettings, Settings def test_from_env_missing_email_and_password(monkeypatch): """Test return None when email/password missing - covers line 138.""" # No environment variables set result = EmailSettings.from_env() assert result is None # Only email, no password monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") result = EmailSettings.from_env() assert result is None # Only password, no email monkeypatch.delenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", raising=False) monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "pass") result = EmailSettings.from_env() assert result is None def test_from_env_missing_hosts_warning(monkeypatch): """Test logger.warning for missing hosts - covers lines 154-156.""" monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "pass") # Missing both hosts result = EmailSettings.from_env() assert result is None # Missing SMTP host monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.test.com") result = EmailSettings.from_env() assert result is None # Missing IMAP host monkeypatch.delenv("MCP_EMAIL_SERVER_IMAP_HOST") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.test.com") result = EmailSettings.from_env() assert result is None def test_from_env_exception_handling(monkeypatch): """Test exception handling in try/except - covers lines 177-179.""" monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "pass") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.test.com") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.test.com") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_PORT", "invalid") # Will cause ValueError result = EmailSettings.from_env() assert result is None def test_from_env_success_with_all_defaults(monkeypatch): """Test successful creation with defaults - covers lines 147-176.""" monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "pass") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.example.com") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.example.com") result = EmailSettings.from_env() assert result is not None assert result.account_name == "default" assert result.full_name == "user" assert result.email_address == "[email protected]" assert result.incoming.user_name == "[email protected]" assert result.incoming.port == 993 assert result.outgoing.port == 465 def test_from_env_with_all_vars_set(monkeypatch): """Test with all environment variables set - covers parse_bool branches.""" env_vars = { "MCP_EMAIL_SERVER_ACCOUNT_NAME": "myaccount", "MCP_EMAIL_SERVER_FULL_NAME": "John Doe", "MCP_EMAIL_SERVER_EMAIL_ADDRESS": "[email protected]", "MCP_EMAIL_SERVER_USER_NAME": "johnuser", "MCP_EMAIL_SERVER_PASSWORD": "pass123", "MCP_EMAIL_SERVER_IMAP_HOST": "imap.example.com", "MCP_EMAIL_SERVER_IMAP_PORT": "143", "MCP_EMAIL_SERVER_IMAP_SSL": "false", "MCP_EMAIL_SERVER_SMTP_HOST": "smtp.example.com", "MCP_EMAIL_SERVER_SMTP_PORT": "587", "MCP_EMAIL_SERVER_SMTP_SSL": "no", "MCP_EMAIL_SERVER_SMTP_START_SSL": "yes", "MCP_EMAIL_SERVER_IMAP_USER_NAME": "imap_john", "MCP_EMAIL_SERVER_IMAP_PASSWORD": "imap_pass", "MCP_EMAIL_SERVER_SMTP_USER_NAME": "smtp_john", "MCP_EMAIL_SERVER_SMTP_PASSWORD": "smtp_pass", } for key, value in env_vars.items(): monkeypatch.setenv(key, value) result = EmailSettings.from_env() assert result is not None assert result.account_name == "myaccount" assert result.full_name == "John Doe" assert result.incoming.user_name == "imap_john" assert result.incoming.password == "imap_pass" # noqa: S105 assert result.incoming.port == 143 assert result.incoming.use_ssl is False assert result.outgoing.user_name == "smtp_john" assert result.outgoing.password == "smtp_pass" # noqa: S105 assert result.outgoing.port == 587 assert result.outgoing.use_ssl is False assert result.outgoing.start_ssl is True def test_from_env_boolean_parsing_variations(monkeypatch): """Test various boolean value parsing - covers parse_bool function.""" base_env = { "MCP_EMAIL_SERVER_EMAIL_ADDRESS": "[email protected]", "MCP_EMAIL_SERVER_PASSWORD": "pass", "MCP_EMAIL_SERVER_IMAP_HOST": "imap.test.com", "MCP_EMAIL_SERVER_SMTP_HOST": "smtp.test.com", } # Test "1" = true monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_SSL", "1") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_SSL", "0") for key, value in base_env.items(): monkeypatch.setenv(key, value) result = EmailSettings.from_env() assert result.incoming.use_ssl is True assert result.outgoing.use_ssl is False # Test "on"/"off" monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_SSL", "on") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_START_SSL", "off") result = EmailSettings.from_env() assert result.incoming.use_ssl is True assert result.outgoing.start_ssl is False def test_settings_init_no_env(monkeypatch, tmp_path): """Test Settings.__init__ when no env vars - covers line 211 false branch.""" config_file = tmp_path / "empty.toml" config_file.write_text("") monkeypatch.setenv("MCP_EMAIL_SERVER_CONFIG_PATH", str(config_file)) # Clear any email env vars for key in list(os.environ.keys()): if key.startswith("MCP_EMAIL_SERVER_") and "CONFIG_PATH" not in key: monkeypatch.delenv(key, raising=False) settings = Settings() assert len(settings.emails) == 0 def test_settings_init_add_new_account(monkeypatch, tmp_path): """Test adding new account from env - covers lines 225-226.""" config_file = tmp_path / "empty.toml" config_file.write_text("") monkeypatch.setenv("MCP_EMAIL_SERVER_CONFIG_PATH", str(config_file)) monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "newpass") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.new.com") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.new.com") monkeypatch.setenv("MCP_EMAIL_SERVER_ACCOUNT_NAME", "newaccount") settings = Settings() assert len(settings.emails) == 1 assert settings.emails[0].account_name == "newaccount" def test_settings_init_override_existing(monkeypatch, tmp_path): """Test overriding existing TOML account - covers lines 214-222.""" config_file = tmp_path / "config.toml" config_file.write_text(""" [[emails]] account_name = "existing" full_name = "Old Name" email_address = "[email protected]" created_at = "2025-01-01T00:00:00" updated_at = "2025-01-01T00:00:00" [emails.incoming] user_name = "olduser" password = "oldpass" host = "imap.old.com" port = 993 use_ssl = true [emails.outgoing] user_name = "olduser" password = "oldpass" host = "smtp.old.com" port = 465 use_ssl = true """) monkeypatch.setenv("MCP_EMAIL_SERVER_CONFIG_PATH", str(config_file)) monkeypatch.setenv("MCP_EMAIL_SERVER_ACCOUNT_NAME", "existing") monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "newpass") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.new.com") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.new.com") settings = Settings() assert len(settings.emails) == 1 assert settings.emails[0].account_name == "existing" assert settings.emails[0].email_address == "[email protected]" # Overridden def test_settings_init_loop_through_multiple_accounts(monkeypatch, tmp_path): """Test loop iteration with multiple accounts - covers lines 214-217.""" config_file = tmp_path / "multi.toml" config_file.write_text(""" [[emails]] account_name = "first" full_name = "First" email_address = "[email protected]" created_at = "2025-01-01T00:00:00" updated_at = "2025-01-01T00:00:00" [emails.incoming] user_name = "first" password = "pass1" host = "imap.first.com" port = 993 use_ssl = true [emails.outgoing] user_name = "first" password = "pass1" host = "smtp.first.com" port = 465 use_ssl = true [[emails]] account_name = "second" full_name = "Second" email_address = "[email protected]" created_at = "2025-01-01T00:00:00" updated_at = "2025-01-01T00:00:00" [emails.incoming] user_name = "second" password = "pass2" host = "imap.second.com" port = 993 use_ssl = true [emails.outgoing] user_name = "second" password = "pass2" host = "smtp.second.com" port = 465 use_ssl = true [[emails]] account_name = "third" full_name = "Third" email_address = "[email protected]" created_at = "2025-01-01T00:00:00" updated_at = "2025-01-01T00:00:00" [emails.incoming] user_name = "third" password = "pass3" host = "imap.third.com" port = 993 use_ssl = true [emails.outgoing] user_name = "third" password = "pass3" host = "smtp.third.com" port = 465 use_ssl = true """) monkeypatch.setenv("MCP_EMAIL_SERVER_CONFIG_PATH", str(config_file)) # Override the third account (forces loop to iterate through all) monkeypatch.setenv("MCP_EMAIL_SERVER_ACCOUNT_NAME", "third") monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "envpass") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.env.com") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.env.com") settings = Settings() # Note: Our implementation replaces all TOML with env, so we only get 1 account assert len(settings.emails) == 1 assert settings.emails[0].account_name == "third" assert settings.emails[0].email_address == "[email protected]" def test_email_settings_masked(monkeypatch): """Test the masked() method - covers line 182.""" monkeypatch.setenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS", "[email protected]") monkeypatch.setenv("MCP_EMAIL_SERVER_PASSWORD", "secret123") monkeypatch.setenv("MCP_EMAIL_SERVER_IMAP_HOST", "imap.test.com") monkeypatch.setenv("MCP_EMAIL_SERVER_SMTP_HOST", "smtp.test.com") email = EmailSettings.from_env() assert email is not None masked = email.masked() assert masked.incoming.password == "********" # noqa: S105 assert masked.outgoing.password == "********" # noqa: S105 assert masked.email_address == "[email protected]" ``` -------------------------------------------------------------------------------- /mcp_email_server/config.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations import datetime import os from pathlib import Path from typing import Any import tomli_w from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator from pydantic_settings import ( BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, TomlConfigSettingsSource, ) from mcp_email_server.log import logger DEFAILT_CONFIG_PATH = "~/.config/zerolib/mcp_email_server/config.toml" CONFIG_PATH = Path(os.getenv("MCP_EMAIL_SERVER_CONFIG_PATH", DEFAILT_CONFIG_PATH)).expanduser().resolve() class EmailServer(BaseModel): user_name: str password: str host: str port: int use_ssl: bool = True # Usually port 465 start_ssl: bool = False # Usually port 587 def masked(self) -> EmailServer: return self.model_copy(update={"password": "********"}) class AccountAttributes(BaseModel): model_config = ConfigDict(json_encoders={datetime.datetime: lambda v: v.isoformat()}) account_name: str description: str = "" created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) updated_at: datetime.datetime = Field(default_factory=datetime.datetime.now) @model_validator(mode="after") @classmethod def update_updated_at(cls, obj: AccountAttributes) -> AccountAttributes: """Update updated_at field.""" # must disable validation to avoid infinite loop obj.model_config["validate_assignment"] = False # update updated_at field obj.updated_at = datetime.datetime.now() # enable validation again obj.model_config["validate_assignment"] = True return obj def __eq__(self, other: object) -> bool: if not isinstance(other, AccountAttributes): return NotImplemented return self.model_dump(exclude={"created_at", "updated_at"}) == other.model_dump( exclude={"created_at", "updated_at"} ) @field_serializer("created_at", "updated_at") def serialize_datetime(self, v: datetime.datetime) -> str: return v.isoformat() def masked(self) -> AccountAttributes: return self.model_copy() class EmailSettings(AccountAttributes): full_name: str email_address: str incoming: EmailServer outgoing: EmailServer @classmethod def init( cls, *, account_name: str, full_name: str, email_address: str, user_name: str, password: str, imap_host: str, smtp_host: str, imap_user_name: str | None = None, imap_password: str | None = None, imap_port: int = 993, imap_ssl: bool = True, smtp_port: int = 465, smtp_ssl: bool = True, smtp_start_ssl: bool = False, smtp_user_name: str | None = None, smtp_password: str | None = None, ) -> EmailSettings: return cls( account_name=account_name, full_name=full_name, email_address=email_address, incoming=EmailServer( user_name=imap_user_name or user_name, password=imap_password or password, host=imap_host, port=imap_port, use_ssl=imap_ssl, ), outgoing=EmailServer( user_name=smtp_user_name or user_name, password=smtp_password or password, host=smtp_host, port=smtp_port, use_ssl=smtp_ssl, start_ssl=smtp_start_ssl, ), ) @classmethod def from_env(cls) -> EmailSettings | None: """Create EmailSettings from environment variables. Expected environment variables: - MCP_EMAIL_SERVER_ACCOUNT_NAME (default: "default") - MCP_EMAIL_SERVER_FULL_NAME - MCP_EMAIL_SERVER_EMAIL_ADDRESS - MCP_EMAIL_SERVER_USER_NAME - MCP_EMAIL_SERVER_PASSWORD - MCP_EMAIL_SERVER_IMAP_HOST - MCP_EMAIL_SERVER_IMAP_PORT (default: 993) - MCP_EMAIL_SERVER_IMAP_SSL (default: true) - MCP_EMAIL_SERVER_SMTP_HOST - MCP_EMAIL_SERVER_SMTP_PORT (default: 465) - MCP_EMAIL_SERVER_SMTP_SSL (default: true) - MCP_EMAIL_SERVER_SMTP_START_SSL (default: false) """ # Check if minimum required environment variables are set email_address = os.getenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS") password = os.getenv("MCP_EMAIL_SERVER_PASSWORD") if not email_address or not password: return None # Parse boolean values def parse_bool(value: str | None, default: bool = True) -> bool: if value is None: return default return value.lower() in ("true", "1", "yes", "on") # Get all environment variables with defaults account_name = os.getenv("MCP_EMAIL_SERVER_ACCOUNT_NAME", "default") full_name = os.getenv("MCP_EMAIL_SERVER_FULL_NAME", email_address.split("@")[0]) user_name = os.getenv("MCP_EMAIL_SERVER_USER_NAME", email_address) imap_host = os.getenv("MCP_EMAIL_SERVER_IMAP_HOST") smtp_host = os.getenv("MCP_EMAIL_SERVER_SMTP_HOST") # Required fields check if not imap_host or not smtp_host: logger.warning("Missing required email configuration environment variables (IMAP_HOST or SMTP_HOST)") return None try: return cls.init( account_name=account_name, full_name=full_name, email_address=email_address, user_name=user_name, password=password, imap_host=imap_host, imap_port=int(os.getenv("MCP_EMAIL_SERVER_IMAP_PORT", "993")), imap_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_IMAP_SSL"), True), smtp_host=smtp_host, smtp_port=int(os.getenv("MCP_EMAIL_SERVER_SMTP_PORT", "465")), smtp_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_SMTP_SSL"), True), smtp_start_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_SMTP_START_SSL"), False), smtp_user_name=os.getenv("MCP_EMAIL_SERVER_SMTP_USER_NAME", user_name), smtp_password=os.getenv("MCP_EMAIL_SERVER_SMTP_PASSWORD", password), imap_user_name=os.getenv("MCP_EMAIL_SERVER_IMAP_USER_NAME", user_name), imap_password=os.getenv("MCP_EMAIL_SERVER_IMAP_PASSWORD", password), ) except (ValueError, TypeError) as e: logger.error(f"Failed to create email settings from environment variables: {e}") return None def masked(self) -> EmailSettings: return self.model_copy( update={ "incoming": self.incoming.masked(), "outgoing": self.outgoing.masked(), } ) class ProviderSettings(AccountAttributes): provider_name: str api_key: str def masked(self) -> AccountAttributes: return self.model_copy(update={"api_key": "********"}) class Settings(BaseSettings): emails: list[EmailSettings] = [] providers: list[ProviderSettings] = [] db_location: str = CONFIG_PATH.with_name("db.sqlite3").as_posix() model_config = SettingsConfigDict(toml_file=CONFIG_PATH, validate_assignment=True, revalidate_instances="always") def __init__(self, **data: Any) -> None: """Initialize Settings with support for environment variables.""" super().__init__(**data) # Check for email configuration from environment variables env_email = EmailSettings.from_env() if env_email: # Check if this account already exists (from TOML) existing_account = None for i, email in enumerate(self.emails): if email.account_name == env_email.account_name: existing_account = i break if existing_account is not None: # Replace existing account with env configuration self.emails[existing_account] = env_email logger.info(f"Overriding email account '{env_email.account_name}' with environment variables") else: # Add new account from env self.emails.insert(0, env_email) logger.info(f"Added email account '{env_email.account_name}' from environment variables") def add_email(self, email: EmailSettings) -> None: """Use re-assigned for validation to work.""" self.emails = [email, *self.emails] def add_provider(self, provider: ProviderSettings) -> None: """Use re-assigned for validation to work.""" self.providers = [provider, *self.providers] def delete_email(self, account_name: str) -> None: """Use re-assigned for validation to work.""" self.emails = [email for email in self.emails if email.account_name != account_name] def delete_provider(self, account_name: str) -> None: """Use re-assigned for validation to work.""" self.providers = [provider for provider in self.providers if provider.account_name != account_name] def get_account(self, account_name: str, masked: bool = False) -> EmailSettings | ProviderSettings | None: for email in self.emails: if email.account_name == account_name: return email if not masked else email.masked() for provider in self.providers: if provider.account_name == account_name: return provider if not masked else provider.masked() return None def get_accounts(self, masked: bool = False) -> list[EmailSettings | ProviderSettings]: accounts = self.emails + self.providers if masked: return [account.masked() for account in accounts] return accounts @model_validator(mode="after") @classmethod def check_unique_account_names(cls, obj: Settings) -> Settings: account_names = set() for email in obj.emails: if email.account_name in account_names: raise ValueError(f"Duplicate account name {email.account_name}") account_names.add(email.account_name) for provider in obj.providers: if provider.account_name in account_names: raise ValueError(f"Duplicate account name {provider.account_name}") account_names.add(provider.account_name) return obj @classmethod def settings_customise_sources( cls, settings_cls: type[BaseSettings], init_settings: PydanticBaseSettingsSource, env_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource, ) -> tuple[PydanticBaseSettingsSource, ...]: return (TomlConfigSettingsSource(settings_cls),) def _to_toml(self) -> str: data = self.model_dump() return tomli_w.dumps(data) def store(self) -> None: toml_file = self.model_config["toml_file"] toml_file.parent.mkdir(parents=True, exist_ok=True) toml_file.write_text(self._to_toml()) logger.info(f"Settings stored in {toml_file}") _settings = None def get_settings(reload: bool = False) -> Settings: global _settings if not _settings or reload: logger.info(f"Loading settings from {CONFIG_PATH}") _settings = Settings() return _settings def store_settings(settings: Settings | None = None) -> None: if not settings: settings = get_settings() settings.store() return def delete_settings() -> None: if not CONFIG_PATH.exists(): logger.info(f"Settings file {CONFIG_PATH} does not exist") return CONFIG_PATH.unlink() logger.info(f"Deleted settings file {CONFIG_PATH}") ``` -------------------------------------------------------------------------------- /mcp_email_server/ui.py: -------------------------------------------------------------------------------- ```python import gradio as gr from mcp_email_server.config import EmailSettings, get_settings, store_settings from mcp_email_server.tools.installer import install_claude_desktop, is_installed, need_update, uninstall_claude_desktop def create_ui(): # noqa: C901 # Create a Gradio interface with gr.Blocks(title="Email Settings Configuration") as app: gr.Markdown("# Email Settings Configuration") # Function to get current accounts def get_current_accounts(): settings = get_settings(reload=True) email_accounts = [email.account_name for email in settings.emails] return email_accounts # Function to update account list display def update_account_list(): settings = get_settings(reload=True) email_accounts = [email.account_name for email in settings.emails] if email_accounts: # Create a detailed list of accounts with more information accounts_details = [] for email in settings.emails: details = [ f"**Account Name:** {email.account_name}", f"**Full Name:** {email.full_name}", f"**Email Address:** {email.email_address}", ] if hasattr(email, "description") and email.description: details.append(f"**Description:** {email.description}") # Add IMAP/SMTP provider info if available if hasattr(email, "incoming") and hasattr(email.incoming, "host"): details.append(f"**IMAP Provider:** {email.incoming.host}") if hasattr(email, "outgoing") and hasattr(email.outgoing, "host"): details.append(f"**SMTP Provider:** {email.outgoing.host}") accounts_details.append("### " + email.account_name + "\n" + "\n".join(details) + "\n") accounts_md = "\n".join(accounts_details) return ( f"## Configured Accounts\n{accounts_md}", gr.update(choices=email_accounts, value=None), gr.update(visible=True), ) else: return ( "No email accounts configured yet.", gr.update(choices=[], value=None), gr.update(visible=False), ) # Display current email accounts and allow deletion with gr.Accordion("Current Email Accounts", open=True): # Display the list of accounts accounts_display = gr.Markdown("") # Create a dropdown to select account to delete account_to_delete = gr.Dropdown(choices=[], label="Select Account to Delete", interactive=True) # Status message for deletion delete_status = gr.Markdown("") # Delete button delete_btn = gr.Button("Delete Selected Account") # Function to delete an account def delete_email_account(account_name): if not account_name: return "Error: Please select an account to delete.", *update_account_list() try: # Get current settings settings = get_settings() # Delete the account settings.delete_email(account_name) # Store settings store_settings(settings) # Return success message and update the UI return f"Success: Email account '{account_name}' has been deleted.", *update_account_list() except Exception as e: return f"Error: {e!s}", *update_account_list() # Connect the delete button to the delete function delete_btn.click( fn=delete_email_account, inputs=[account_to_delete], outputs=[delete_status, accounts_display, account_to_delete, delete_btn], ) # Initialize the account list app.load( fn=update_account_list, inputs=None, outputs=[accounts_display, account_to_delete, delete_btn], ) # Form for adding a new email account with gr.Accordion("Add New Email Account", open=True): gr.Markdown("### Add New Email Account") # Basic account information account_name = gr.Textbox(label="Account Name", placeholder="e.g. work_email") full_name = gr.Textbox(label="Full Name", placeholder="e.g. John Doe") email_address = gr.Textbox(label="Email Address", placeholder="e.g. [email protected]") # Credentials user_name = gr.Textbox(label="Username", placeholder="e.g. [email protected]") password = gr.Textbox(label="Password", type="password") # IMAP settings with gr.Row(): with gr.Column(): gr.Markdown("### IMAP Settings") imap_host = gr.Textbox(label="IMAP Host", placeholder="e.g. imap.example.com") imap_port = gr.Number(label="IMAP Port", value=993) imap_ssl = gr.Checkbox(label="Use SSL", value=True) imap_user_name = gr.Textbox( label="IMAP Username (optional)", placeholder="Leave empty to use the same as above" ) imap_password = gr.Textbox( label="IMAP Password (optional)", type="password", placeholder="Leave empty to use the same as above", ) # SMTP settings with gr.Column(): gr.Markdown("### SMTP Settings") smtp_host = gr.Textbox(label="SMTP Host", placeholder="e.g. smtp.example.com") smtp_port = gr.Number(label="SMTP Port", value=465) smtp_ssl = gr.Checkbox(label="Use SSL", value=True) smtp_start_ssl = gr.Checkbox(label="Start SSL", value=False) smtp_user_name = gr.Textbox( label="SMTP Username (optional)", placeholder="Leave empty to use the same as above" ) smtp_password = gr.Textbox( label="SMTP Password (optional)", type="password", placeholder="Leave empty to use the same as above", ) # Status message status_message = gr.Markdown("") # Save button save_btn = gr.Button("Save Email Settings") # Function to save settings def save_email_settings( account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ): try: # Validate required fields if not account_name or not full_name or not email_address or not user_name or not password: # Get account list update account_md, account_choices, btn_visible = update_account_list() return ( "Error: Please fill in all required fields.", account_md, account_choices, btn_visible, account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ) if not imap_host or not smtp_host: # Get account list update account_md, account_choices, btn_visible = update_account_list() return ( "Error: IMAP and SMTP hosts are required.", account_md, account_choices, btn_visible, account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ) # Get current settings settings = get_settings() # Check if account name already exists for email in settings.emails: if email.account_name == account_name: # Get account list update account_md, account_choices, btn_visible = update_account_list() return ( f"Error: Account name '{account_name}' already exists.", account_md, account_choices, btn_visible, account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ) # Create new email settings email_settings = EmailSettings.init( account_name=account_name, full_name=full_name, email_address=email_address, user_name=user_name, password=password, imap_host=imap_host, smtp_host=smtp_host, imap_port=int(imap_port), imap_ssl=imap_ssl, smtp_port=int(smtp_port), smtp_ssl=smtp_ssl, smtp_start_ssl=smtp_start_ssl, imap_user_name=imap_user_name if imap_user_name else None, imap_password=imap_password if imap_password else None, smtp_user_name=smtp_user_name if smtp_user_name else None, smtp_password=smtp_password if smtp_password else None, ) # Add to settings settings.add_email(email_settings) # Store settings store_settings(settings) # Get account list update account_md, account_choices, btn_visible = update_account_list() # Return success message, update the UI, and clear form fields return ( f"Success: Email account '{account_name}' has been added.", account_md, account_choices, btn_visible, "", # Clear account_name "", # Clear full_name "", # Clear email_address "", # Clear user_name "", # Clear password "", # Clear imap_host 993, # Reset imap_port True, # Reset imap_ssl "", # Clear imap_user_name "", # Clear imap_password "", # Clear smtp_host 465, # Reset smtp_port True, # Reset smtp_ssl False, # Reset smtp_start_ssl "", # Clear smtp_user_name "", # Clear smtp_password ) except Exception as e: # Get account list update account_md, account_choices, btn_visible = update_account_list() return ( f"Error: {e!s}", account_md, account_choices, btn_visible, account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ) # Connect the save button to the save function save_btn.click( fn=save_email_settings, inputs=[ account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ], outputs=[ status_message, accounts_display, account_to_delete, delete_btn, account_name, full_name, email_address, user_name, password, imap_host, imap_port, imap_ssl, imap_user_name, imap_password, smtp_host, smtp_port, smtp_ssl, smtp_start_ssl, smtp_user_name, smtp_password, ], ) # Claude Desktop Integration with gr.Accordion("Claude Desktop Integration", open=True): gr.Markdown("### Claude Desktop Integration") # Status display for Claude Desktop integration claude_status = gr.Markdown("") # Function to check and update Claude Desktop status def update_claude_status(): if is_installed(): if need_update(): return "Claude Desktop integration is installed but needs to be updated." else: return "Claude Desktop integration is installed and up to date." else: return "Claude Desktop integration is not installed." # Buttons for Claude Desktop actions with gr.Row(): install_update_btn = gr.Button("Install to Claude Desktop") uninstall_btn = gr.Button("Uninstall from Claude Desktop") # Functions for Claude Desktop actions def install_or_update_claude(): try: install_claude_desktop() status = update_claude_status() # Update button states based on new status is_inst = is_installed() needs_upd = need_update() button_text = "Update Claude Desktop" if (is_inst and needs_upd) else "Install to Claude Desktop" button_interactive = not (is_inst and not needs_upd) return [ status, gr.update(value=button_text, interactive=button_interactive), gr.update(interactive=is_inst), ] except Exception as e: return [f"Error installing/updating Claude Desktop: {e!s}", gr.update(), gr.update()] def uninstall_from_claude(): try: uninstall_claude_desktop() status = update_claude_status() # Update button states based on new status is_inst = is_installed() needs_upd = need_update() button_text = "Update Claude Desktop" if (is_inst and needs_upd) else "Install to Claude Desktop" button_interactive = not (is_inst and not needs_upd) return [ status, gr.update(value=button_text, interactive=button_interactive), gr.update(interactive=is_inst), ] except Exception as e: return [f"Error uninstalling from Claude Desktop: {e!s}", gr.update(), gr.update()] # Function to update button states based on installation status def update_button_states(): status = update_claude_status() is_inst = is_installed() needs_upd = need_update() button_text = "Update Claude Desktop" if (is_inst and needs_upd) else "Install to Claude Desktop" button_interactive = not (is_inst and not needs_upd) return [ status, gr.update(value=button_text, interactive=button_interactive), gr.update(interactive=is_inst), ] # Connect buttons to functions install_update_btn.click( fn=install_or_update_claude, inputs=[], outputs=[claude_status, install_update_btn, uninstall_btn] ) uninstall_btn.click( fn=uninstall_from_claude, inputs=[], outputs=[claude_status, install_update_btn, uninstall_btn] ) # Initialize Claude Desktop status and button states app.load(fn=update_button_states, inputs=None, outputs=[claude_status, install_update_btn, uninstall_btn]) return app def main(): app = create_ui() app.launch(inbrowser=True) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /mcp_email_server/emails/classic.py: -------------------------------------------------------------------------------- ```python import email.utils from collections.abc import AsyncGenerator from datetime import datetime from email.header import Header from email.mime.text import MIMEText from email.parser import BytesParser from email.policy import default from typing import Any import aioimaplib import aiosmtplib from mcp_email_server.config import EmailServer, EmailSettings from mcp_email_server.emails import EmailHandler from mcp_email_server.emails.models import ( EmailBodyResponse, EmailContentBatchResponse, EmailMetadata, EmailMetadataPageResponse, ) from mcp_email_server.log import logger class EmailClient: def __init__(self, email_server: EmailServer, sender: str | None = None): self.email_server = email_server self.sender = sender or email_server.user_name self.imap_class = aioimaplib.IMAP4_SSL if self.email_server.use_ssl else aioimaplib.IMAP4 self.smtp_use_tls = self.email_server.use_ssl self.smtp_start_tls = self.email_server.start_ssl def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> dict[str, Any]: # noqa: C901 """Parse raw email data into a structured dictionary.""" parser = BytesParser(policy=default) email_message = parser.parsebytes(raw_email) # Extract email parts subject = email_message.get("Subject", "") sender = email_message.get("From", "") date_str = email_message.get("Date", "") # Extract recipients to_addresses = [] to_header = email_message.get("To", "") if to_header: # Simple parsing - split by comma and strip whitespace to_addresses = [addr.strip() for addr in to_header.split(",")] # Also check CC recipients cc_header = email_message.get("Cc", "") if cc_header: to_addresses.extend([addr.strip() for addr in cc_header.split(",")]) # Parse date try: date_tuple = email.utils.parsedate_tz(date_str) date = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) if date_tuple else datetime.now() except Exception: date = datetime.now() # Get body content body = "" attachments = [] if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition", "")) # Handle attachments if "attachment" in content_disposition: filename = part.get_filename() if filename: attachments.append(filename) # Handle text parts elif content_type == "text/plain": body_part = part.get_payload(decode=True) if body_part: charset = part.get_content_charset("utf-8") try: body += body_part.decode(charset) except UnicodeDecodeError: body += body_part.decode("utf-8", errors="replace") else: # Handle plain text emails payload = email_message.get_payload(decode=True) if payload: charset = email_message.get_content_charset("utf-8") try: body = payload.decode(charset) except UnicodeDecodeError: body = payload.decode("utf-8", errors="replace") # TODO: Allow retrieving full email body if body and len(body) > 20000: body = body[:20000] + "...[TRUNCATED]" return { "email_id": email_id or "", "subject": subject, "from": sender, "to": to_addresses, "body": body, "date": date, "attachments": attachments, } @staticmethod def _build_search_criteria( before: datetime | None = None, since: datetime | None = None, subject: str | None = None, body: str | None = None, text: str | None = None, from_address: str | None = None, to_address: str | None = None, ): search_criteria = [] if before: search_criteria.extend(["BEFORE", before.strftime("%d-%b-%Y").upper()]) if since: search_criteria.extend(["SINCE", since.strftime("%d-%b-%Y").upper()]) if subject: search_criteria.extend(["SUBJECT", subject]) if body: search_criteria.extend(["BODY", body]) if text: search_criteria.extend(["TEXT", text]) if from_address: search_criteria.extend(["FROM", from_address]) if to_address: search_criteria.extend(["TO", to_address]) # If no specific criteria, search for ALL if not search_criteria: search_criteria = ["ALL"] return search_criteria async def get_email_count( self, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, ) -> int: imap = self.imap_class(self.email_server.host, self.email_server.port) try: # Wait for the connection to be established await imap._client_task await imap.wait_hello_from_server() # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) await imap.select("INBOX") search_criteria = self._build_search_criteria( before, since, subject, from_address=from_address, to_address=to_address ) logger.info(f"Count: Search criteria: {search_criteria}") # Search for messages and count them - use UID SEARCH for consistency _, messages = await imap.uid_search(*search_criteria) return len(messages[0].split()) finally: # Ensure we logout properly try: await imap.logout() except Exception as e: logger.info(f"Error during logout: {e}") async def get_emails_metadata_stream( # noqa: C901 self, page: int = 1, page_size: int = 10, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, order: str = "desc", ) -> AsyncGenerator[dict[str, Any], None]: imap = self.imap_class(self.email_server.host, self.email_server.port) try: # Wait for the connection to be established await imap._client_task await imap.wait_hello_from_server() # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) try: await imap.id(name="mcp-email-server", version="1.0.0") except Exception as e: logger.warning(f"IMAP ID command failed: {e!s}") await imap.select("INBOX") search_criteria = self._build_search_criteria( before, since, subject, from_address=from_address, to_address=to_address ) logger.info(f"Get metadata: Search criteria: {search_criteria}") # Search for messages - use UID SEARCH for better compatibility _, messages = await imap.uid_search(*search_criteria) # Handle empty or None responses if not messages or not messages[0]: logger.warning("No messages returned from search") email_ids = [] else: email_ids = messages[0].split() logger.info(f"Found {len(email_ids)} email IDs") start = (page - 1) * page_size end = start + page_size if order == "desc": email_ids.reverse() # Fetch each message's metadata only for _, email_id in enumerate(email_ids[start:end]): try: # Convert email_id from bytes to string email_id_str = email_id.decode("utf-8") # Fetch only headers to get metadata without body _, data = await imap.uid("fetch", email_id_str, "BODY.PEEK[HEADER]") if not data: logger.error(f"Failed to fetch headers for UID {email_id_str}") continue # Find the email headers in the response raw_headers = None if len(data) > 1 and isinstance(data[1], bytearray): raw_headers = bytes(data[1]) else: # Search through all items for header content for item in data: if isinstance(item, bytes | bytearray) and len(item) > 10: # Skip IMAP protocol responses if isinstance(item, bytes) and b"FETCH" in item: continue # This is likely the header content raw_headers = bytes(item) if isinstance(item, bytearray) else item break if raw_headers: try: # Parse headers only parser = BytesParser(policy=default) email_message = parser.parsebytes(raw_headers) # Extract metadata subject = email_message.get("Subject", "") sender = email_message.get("From", "") date_str = email_message.get("Date", "") # Extract recipients to_addresses = [] to_header = email_message.get("To", "") if to_header: to_addresses = [addr.strip() for addr in to_header.split(",")] cc_header = email_message.get("Cc", "") if cc_header: to_addresses.extend([addr.strip() for addr in cc_header.split(",")]) # Parse date try: date_tuple = email.utils.parsedate_tz(date_str) date = ( datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) if date_tuple else datetime.now() ) except Exception: date = datetime.now() # For metadata, we don't fetch attachments to save bandwidth # We'll mark it as unknown for now metadata = { "email_id": email_id_str, "subject": subject, "from": sender, "to": to_addresses, "date": date, "attachments": [], # We don't fetch attachment info for metadata } yield metadata except Exception as e: # Log error but continue with other emails logger.error(f"Error parsing email metadata: {e!s}") else: logger.error(f"Could not find header data in response for email ID: {email_id_str}") except Exception as e: logger.error(f"Error fetching email metadata {email_id}: {e!s}") finally: # Ensure we logout properly try: await imap.logout() except Exception as e: logger.info(f"Error during logout: {e}") def _check_email_content(self, data: list) -> bool: """Check if the fetched data contains actual email content.""" for item in data: if isinstance(item, bytes) and b"FETCH (" in item and b"RFC822" not in item and b"BODY" not in item: # This is just metadata, not actual content continue elif isinstance(item, bytes | bytearray) and len(item) > 100: # This looks like email content return True return False def _extract_raw_email(self, data: list) -> bytes | None: """Extract raw email bytes from IMAP response data.""" # The email content is typically at index 1 as a bytearray if len(data) > 1 and isinstance(data[1], bytearray): return bytes(data[1]) # Search through all items for email content for item in data: if isinstance(item, bytes | bytearray) and len(item) > 100: # Skip IMAP protocol responses if isinstance(item, bytes) and b"FETCH" in item: continue # This is likely the email content return bytes(item) if isinstance(item, bytearray) else item return None async def _fetch_email_with_formats(self, imap, email_id: str) -> list | None: """Try different fetch formats to get email data.""" fetch_formats = ["RFC822", "BODY[]", "BODY.PEEK[]", "(BODY.PEEK[])"] for fetch_format in fetch_formats: try: _, data = await imap.uid("fetch", email_id, fetch_format) if data and len(data) > 0 and self._check_email_content(data): return data except Exception as e: logger.debug(f"Fetch format {fetch_format} failed: {e}") return None async def get_email_body_by_id(self, email_id: str) -> dict[str, Any] | None: imap = self.imap_class(self.email_server.host, self.email_server.port) try: # Wait for the connection to be established await imap._client_task await imap.wait_hello_from_server() # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) try: await imap.id(name="mcp-email-server", version="1.0.0") except Exception as e: logger.warning(f"IMAP ID command failed: {e!s}") await imap.select("INBOX") # Fetch the specific email by UID data = await self._fetch_email_with_formats(imap, email_id) if not data: logger.error(f"Failed to fetch UID {email_id} with any format") return None # Extract raw email data raw_email = self._extract_raw_email(data) if not raw_email: logger.error(f"Could not find email data in response for email ID: {email_id}") return None # Parse the email try: return self._parse_email_data(raw_email, email_id) except Exception as e: logger.error(f"Error parsing email: {e!s}") return None finally: # Ensure we logout properly try: await imap.logout() except Exception as e: logger.info(f"Error during logout: {e}") async def send_email( self, recipients: list[str], subject: str, body: str, cc: list[str] | None = None, bcc: list[str] | None = None, html: bool = False, ): # Create message with UTF-8 encoding to support special characters content_type = "html" if html else "plain" msg = MIMEText(body, content_type, "utf-8") # Handle subject with special characters if any(ord(c) > 127 for c in subject): msg["Subject"] = Header(subject, "utf-8") else: msg["Subject"] = subject # Handle sender name with special characters if any(ord(c) > 127 for c in self.sender): msg["From"] = Header(self.sender, "utf-8") else: msg["From"] = self.sender msg["To"] = ", ".join(recipients) # Add CC header if provided (visible to recipients) if cc: msg["Cc"] = ", ".join(cc) # Note: BCC recipients are not added to headers (they remain hidden) # but will be included in the actual recipients for SMTP delivery async with aiosmtplib.SMTP( hostname=self.email_server.host, port=self.email_server.port, start_tls=self.smtp_start_tls, use_tls=self.smtp_use_tls, ) as smtp: await smtp.login(self.email_server.user_name, self.email_server.password) # Create a combined list of all recipients for delivery all_recipients = recipients.copy() if cc: all_recipients.extend(cc) if bcc: all_recipients.extend(bcc) await smtp.send_message(msg, recipients=all_recipients) class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): self.email_settings = email_settings self.incoming_client = EmailClient(email_settings.incoming) self.outgoing_client = EmailClient( email_settings.outgoing, sender=f"{email_settings.full_name} <{email_settings.email_address}>", ) async def get_emails_metadata( self, page: int = 1, page_size: int = 10, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, order: str = "desc", ) -> EmailMetadataPageResponse: emails = [] async for email_data in self.incoming_client.get_emails_metadata_stream( page, page_size, before, since, subject, from_address, to_address, order ): emails.append(EmailMetadata.from_email(email_data)) total = await self.incoming_client.get_email_count( before, since, subject, from_address=from_address, to_address=to_address ) return EmailMetadataPageResponse( page=page, page_size=page_size, before=before, since=since, subject=subject, emails=emails, total=total, ) async def get_emails_content(self, email_ids: list[str]) -> EmailContentBatchResponse: """Batch retrieve email body content""" emails = [] failed_ids = [] for email_id in email_ids: try: email_data = await self.incoming_client.get_email_body_by_id(email_id) if email_data: emails.append( EmailBodyResponse( email_id=email_data["email_id"], subject=email_data["subject"], sender=email_data["from"], recipients=email_data["to"], date=email_data["date"], body=email_data["body"], attachments=email_data["attachments"], ) ) else: failed_ids.append(email_id) except Exception as e: logger.error(f"Failed to retrieve email {email_id}: {e}") failed_ids.append(email_id) return EmailContentBatchResponse( emails=emails, requested_count=len(email_ids), retrieved_count=len(emails), failed_ids=failed_ids, ) async def send_email( self, recipients: list[str], subject: str, body: str, cc: list[str] | None = None, bcc: list[str] | None = None, html: bool = False, ) -> None: await self.outgoing_client.send_email(recipients, subject, body, cc, bcc, html) ```