This is page 1 of 2. Use http://codebase.md/ai-zerolab/mcp-toolbox?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── actions │ │ └── setup-python-env │ │ └── action.yml │ └── workflows │ ├── main.yml │ ├── on-release-main.yml │ └── validate-codecov-config.yml ├── .gitignore ├── .pre-commit-config.yaml ├── .vscode │ └── settings.json ├── codecov.yaml ├── CONTRIBUTING.md ├── Dockerfile ├── docs │ ├── index.md │ └── modules.md ├── generate_config_template.py ├── LICENSE ├── llms.txt ├── Makefile ├── mcp_toolbox │ ├── __init__.py │ ├── app.py │ ├── audio │ │ ├── __init__.py │ │ └── tools.py │ ├── cli.py │ ├── command_line │ │ ├── __init__.py │ │ └── tools.py │ ├── config.py │ ├── enhance │ │ ├── __init__.py │ │ ├── memory.py │ │ └── tools.py │ ├── figma │ │ ├── __init__.py │ │ └── tools.py │ ├── file_ops │ │ ├── __init__.py │ │ └── tools.py │ ├── flux │ │ ├── __init__.py │ │ ├── api.py │ │ └── tools.py │ ├── log.py │ ├── markitdown │ │ ├── __init__.py │ │ └── tools.py │ ├── web │ │ ├── __init__.py │ │ └── tools.py │ └── xiaoyuzhoufm │ ├── __init__.py │ └── tools.py ├── mkdocs.yml ├── pyproject.toml ├── pytest.ini ├── README.md ├── smithery.yaml ├── tests │ ├── audio │ │ └── test_audio_tools.py │ ├── command_line │ │ └── test_command_line_tools.py │ ├── enhance │ │ ├── test_enhance_tools.py │ │ └── test_memory.py │ ├── figma │ │ └── test_figma_tools.py │ ├── file_ops │ │ └── test_file_ops_tools.py │ ├── flux │ │ └── test_flux_tools.py │ ├── markitdown │ │ └── test_markitdown_tools.py │ ├── mock │ │ └── figma │ │ ├── delete_comment.json │ │ ├── get_comments.json │ │ ├── get_component.json │ │ ├── get_file_components.json │ │ ├── get_file_nodes.json │ │ ├── get_file_styles.json │ │ ├── get_file.json │ │ ├── get_image_fills.json │ │ ├── get_image.json │ │ ├── get_project_files.json │ │ ├── get_style.json │ │ ├── get_team_component_sets.json │ │ ├── get_team_components.json │ │ ├── get_team_projects.json │ │ ├── get_team_styles.json │ │ └── post_comment.json │ ├── web │ │ └── test_web_tools.py │ └── xiaoyuzhoufm │ └── test_xiaoyuzhoufm_tools.py ├── tox.ini └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.pre-commit-config.yaml: -------------------------------------------------------------------------------- ```yaml repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: "v5.0.0" hooks: - id: check-case-conflict - id: check-merge-conflict - id: check-toml - id: check-yaml - id: check-json exclude: ^.devcontainer/devcontainer.json - id: pretty-format-json exclude: ^.devcontainer/devcontainer.json args: [--autofix] - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/executablebooks/mdformat rev: 0.7.22 hooks: - id: mdformat additional_dependencies: [mdformat-gfm, mdformat-frontmatter, mdformat-footnote] - repo: https://github.com/astral-sh/ruff-pre-commit rev: "v0.11.8" hooks: - id: ruff args: [--exit-non-zero-on-fix] - id: ruff-format ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` docs/source # From https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # Vscode config files # .vscode/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # mcp-toolbox [](https://img.shields.io/github/v/release/ai-zerolab/mcp-toolbox) [](https://github.com/ai-zerolab/mcp-toolbox/actions/workflows/main.yml?query=branch%3Amain) [](https://codecov.io/gh/ai-zerolab/mcp-toolbox) [](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-toolbox) [](https://img.shields.io/github/license/ai-zerolab/mcp-toolbox) A comprehensive toolkit for enhancing LLM capabilities through the Model Context Protocol (MCP). This package provides a collection of tools that allow LLMs to interact with external services and APIs, extending their functionality beyond text generation. - **GitHub repository**: <https://github.com/ai-zerolab/mcp-toolbox/> - (WIP)**Documentation**: <https://ai-zerolab.github.io/mcp-toolbox/> ## Features > \*nix is our main target, but Windows should work too. - **Command Line Execution**: Execute any command line instruction through LLM - **Figma Integration**: Access Figma files, components, styles, and more - **Extensible Architecture**: Easily add new API integrations - **MCP Protocol Support**: Compatible with Claude Desktop and other MCP-enabled LLMs - **Comprehensive Testing**: Well-tested codebase with high test coverage ## Installation ### Using uv (Recommended) We recommend using [uv](https://github.com/astral-sh/uv) to manage your environment. ```bash # Install uv curl -LsSf https://astral.sh/uv/install.sh | sh # For macOS/Linux # or powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" # For Windows ``` Then you can use `uvx "mcp-toolbox@latest" stdio` as commands for running the MCP server for latest version. **Audio and memory tools are not included in the default installation.**, you can include them by installing the `all` extra: > [audio] for audio tools, [memory] for memory tools, [all] for all tools ```bash uvx "mcp-toolbox[all]@latest" stdio ``` ### Installing via Smithery To install Toolbox for LLM Enhancement for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ai-zerolab/mcp-toolbox): ```bash npx -y @smithery/cli install @ai-zerolab/mcp-toolbox --client claude ``` ### Using pip ```bash pip install "mcp-toolbox[all]" ``` And you can use `mcp-toolbox stdio` as commands for running the MCP server. ## Configuration ### Environment Variables The following environment variables can be configured: - `FIGMA_API_KEY`: API key for Figma integration - `TAVILY_API_KEY`: API key for Tavily integration - `DUCKDUCKGO_API_KEY`: API key for DuckDuckGo integration - `BFL_API_KEY`: API key for Flux image generation API ### Memory Storage Memory tools store data in the following locations: - **macOS**: `~/Documents/zerolab/mcp-toolbox/memory` (syncs across devices via iCloud) - **Other platforms**: `~/.zerolab/mcp-toolbox/memory` ### Full Configuration To use mcp-toolbox with Claude Desktop/Cline/Cursor/..., add the following to your configuration file: ```json { "mcpServers": { "zerolab-toolbox": { "command": "uvx", "args": ["--prerelease=allow", "mcp-toolbox@latest", "stdio"], "env": { "FIGMA_API_KEY": "your-figma-api-key", "TAVILY_API_KEY": "your-tavily-api-key", "DUCKDUCKGO_API_KEY": "your-duckduckgo-api-key", "BFL_API_KEY": "your-bfl-api-key" } } } } ``` For full features: ```json { "mcpServers": { "zerolab-toolbox": { "command": "uvx", "args": [ "--prerelease=allow", "--python=3.12", "mcp-toolbox[all]@latest", "stdio" ], "env": { "FIGMA_API_KEY": "your-figma-api-key", "TAVILY_API_KEY": "your-tavily-api-key", "DUCKDUCKGO_API_KEY": "your-duckduckgo-api-key", "BFL_API_KEY": "your-bfl-api-key" } } } } ``` You can generate a debug configuration template using: ```bash uv run generate_config_template.py ``` ## Available Tools ### Command Line Tools | Tool | Description | | ----------------- | ---------------------------------- | | `execute_command` | Execute a command line instruction | ### File Operations Tools | Tool | Description | | -------------------- | --------------------------------------------------- | | `read_file_content` | Read content from a file | | `write_file_content` | Write content to a file | | `replace_in_file` | Replace content in a file using regular expressions | | `list_directory` | List directory contents with detailed information | ### Figma Tools | Tool | Description | | ------------------------------- | ---------------------------------------- | | `figma_get_file` | Get a Figma file by key | | `figma_get_file_nodes` | Get specific nodes from a Figma file | | `figma_get_image` | Get images for nodes in a Figma file | | `figma_get_image_fills` | Get URLs for images used in a Figma file | | `figma_get_comments` | Get comments on a Figma file | | `figma_post_comment` | Post a comment on a Figma file | | `figma_delete_comment` | Delete a comment from a Figma file | | `figma_get_team_projects` | Get projects for a team | | `figma_get_project_files` | Get files for a project | | `figma_get_team_components` | Get components for a team | | `figma_get_file_components` | Get components from a file | | `figma_get_component` | Get a component by key | | `figma_get_team_component_sets` | Get component sets for a team | | `figma_get_team_styles` | Get styles for a team | | `figma_get_file_styles` | Get styles from a file | | `figma_get_style` | Get a style by key | ### XiaoyuZhouFM Tools | Tool | Description | | ----------------------- | ------------------------------------------------------------------------------------------ | | `xiaoyuzhoufm_download` | Download a podcast episode from XiaoyuZhouFM with optional automatic m4a to mp3 conversion | ### Audio Tools | Tool | Description | | ------------------ | ---------------------------------------------------------------- | | `get_audio_length` | Get the length of an audio file in seconds | | `get_audio_text` | Get transcribed text from a specific time range in an audio file | ### Memory Tools | Tool | Description | | ---------------- | ----------------------------------------------------------------------- | | `think` | Use the tool to think about something and append the thought to the log | | `get_session_id` | Get the current session ID | | `remember` | Store a memory (brief and detail) in the memory database | | `recall` | Query memories from the database with semantic search | | `forget` | Clear all memories in the memory database | ### Markitdown Tools | Tool | Description | | -------------------------- | --------------------------------------------- | | `convert_file_to_markdown` | Convert any file to Markdown using MarkItDown | | `convert_url_to_markdown` | Convert a URL to Markdown using MarkItDown | ### Web Tools | Tool | Description | | ------------------------ | -------------------------------------------------- | | `get_html` | Get HTML content from a URL | | `save_html` | Save HTML from a URL to a file | | `search_with_tavily` | Search the web using Tavily (requires API key) | | `search_with_duckduckgo` | Search the web using DuckDuckGo (requires API key) | ### Flux Image Generation Tools | Tool | Description | | --------------------- | ---------------------------------------------------------- | | `flux_generate_image` | Generate an image using the Flux API and save it to a file | ## Usage Examples ### Running the MCP Server ```bash # Run with stdio transport (default) mcp-toolbox stdio # Run with SSE transport mcp-toolbox sse --host localhost --port 9871 ``` ### Using with Claude Desktop 1. Configure Claude Desktop as shown in the Configuration section 1. Start Claude Desktop 1. Ask Claude to interact with Figma files: - "Can you get information about this Figma file: 12345abcde?" - "Show me the components in this Figma file: 12345abcde" - "Get the comments from this Figma file: 12345abcde" 1. Ask Claude to execute command line instructions: - "What files are in the current directory?" - "What's the current system time?" - "Show me the contents of a specific file." 1. Ask Claude to download podcasts from XiaoyuZhouFM: - "Download this podcast episode: https://www.xiaoyuzhoufm.com/episode/67c3d80fb0167b8db9e3ec0f" - "Download and convert to MP3 this podcast: https://www.xiaoyuzhoufm.com/episode/67c3d80fb0167b8db9e3ec0f" 1. Ask Claude to work with audio files: - "What's the length of this audio file: audio.m4a?" - "Transcribe the audio from 60 to 90 seconds in audio.m4a" - "Get the text from 2:30 to 3:00 in the audio file" 1. Ask Claude to convert files or URLs to Markdown: - "Convert this file to Markdown: document.docx" - "Convert this webpage to Markdown: https://example.com" 1. Ask Claude to work with web content: - "Get the HTML content from https://example.com" - "Save the HTML from https://example.com to a file" - "Search the web for 'artificial intelligence news'" 1. Ask Claude to generate images with Flux: - "Generate an image of a beautiful sunset over mountains" - "Create an image of a futuristic city and save it to my desktop" - "Generate a portrait of a cat in a space suit" 1. Ask Claude to use memory tools: - "Remember this important fact: The capital of France is Paris" - "What's my current session ID?" - "Recall any information about France" - "Think about the implications of climate change" - "Forget all stored memories" ## Development ### Local Setup Fork the repository and clone it to your local machine. ```bash # Install in development mode make install # Activate a virtual environment source .venv/bin/activate # For macOS/Linux # or .venv\Scripts\activate # For Windows ``` ### Running Tests ```bash make test ``` ### Running Checks ```bash make check ``` ### Building Documentation ```bash make docs ``` ## Adding New Tools To add a new API integration: 1. Update `config.py` with any required API keys 1. Create a new module in `mcp_toolbox/` 1. Implement your API client and tools 1. Add tests for your new functionality 1. Update the README.md with new environment variables and tools See the [development guide](llms.txt) for more detailed instructions. ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. 1. Fork the repository 1. Create a feature branch (`git checkout -b feature/amazing-feature`) 1. Commit your changes (`git commit -m 'Add some amazing feature'`) 1. Push to the branch (`git push origin feature/amazing-feature`) 1. Open a Pull Request ## License This project is licensed under the terms of the license included in the repository. ``` -------------------------------------------------------------------------------- /CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown # Contributing to `mcp-toolbox` Contributions are welcome, and they are greatly appreciated! Every little bit helps, and credit will always be given. You can contribute in many ways: # Types of Contributions ## Report Bugs Report bugs at https://github.com/ai-zerolab/mcp-toolbox/issues If you are reporting a bug, please include: - Your operating system name and version. - Any details about your local setup that might be helpful in troubleshooting. - Detailed steps to reproduce the bug. ## Fix Bugs Look through the GitHub issues for bugs. Anything tagged with "bug" and "help wanted" is open to whoever wants to implement a fix for it. ## Implement Features Look through the GitHub issues for features. Anything tagged with "enhancement" and "help wanted" is open to whoever wants to implement it. ## Write Documentation mcp-toolbox could always use more documentation, whether as part of the official docs, in docstrings, or even on the web in blog posts, articles, and such. ## Submit Feedback The best way to send feedback is to file an issue at https://github.com/ai-zerolab/mcp-toolbox/issues. If you are proposing a new feature: - Explain in detail how it would work. - Keep the scope as narrow as possible, to make it easier to implement. - Remember that this is a volunteer-driven project, and that contributions are welcome :) # Get Started! ## Installing locally Ready to contribute? Here's how to set up `mcp-toolbox` for local development. Please note this documentation assumes you already have `uv` and `Git` installed and ready to go. 1. Fork the `mcp-toolbox` repo on GitHub. 1. Clone your fork locally: ```bash cd <directory_in_which_repo_should_be_created> git clone [email protected]:YOUR_NAME/mcp-toolbox.git ``` 3. Now we need to install the environment. Navigate into the directory ```bash cd mcp-toolbox ``` Then, install and activate the environment with: ```bash make install ``` 4. Install pre-commit to run linters/formatters at commit time: ```bash uv run pre-commit install ``` 5. Create a branch for local development: ```bash git checkout -b name-of-your-bugfix-or-feature ``` Now you can make your changes locally. Don't forget to add test cases for your added functionality to the `tests` directory. ## After making your changes When you're done making changes, check that your changes pass the formatting tests. ```bash make check ``` Now, validate that all unit tests are passing: ```bash make test ``` Before raising a pull request you should also run tox. This will run the tests across different versions of Python: ```bash tox ``` This requires you to have multiple versions of python installed. This step is also triggered in the CI/CD pipeline, so you could also choose to skip this step locally. ## Commit your changes and push your branch to GitHub: ```bash git add . git commit -m "Your detailed description of your changes." git push origin name-of-your-bugfix-or-feature ``` Submit a pull request through the GitHub website. # Pull Request Guidelines Before you submit a pull request, check that it meets these guidelines: 1. The pull request should include tests. 1. If the pull request adds functionality, the docs should be updated. Put your new functionality into a function with a docstring, and add the feature to the list in `README.md`. ``` -------------------------------------------------------------------------------- /docs/modules.md: -------------------------------------------------------------------------------- ```markdown ``` -------------------------------------------------------------------------------- /mcp_toolbox/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_toolbox/figma/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_toolbox/markitdown/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_toolbox/web/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/mock/figma/delete_comment.json: -------------------------------------------------------------------------------- ```json { "success": true } ``` -------------------------------------------------------------------------------- /mcp_toolbox/audio/__init__.py: -------------------------------------------------------------------------------- ```python """Audio processing tools.""" ``` -------------------------------------------------------------------------------- /mcp_toolbox/flux/__init__.py: -------------------------------------------------------------------------------- ```python """Flux API image generation module.""" ``` -------------------------------------------------------------------------------- /mcp_toolbox/command_line/__init__.py: -------------------------------------------------------------------------------- ```python """Command line tools for MCP-Toolbox.""" ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` # pytest.ini [pytest] asyncio_mode = auto ``` -------------------------------------------------------------------------------- /mcp_toolbox/xiaoyuzhoufm/__init__.py: -------------------------------------------------------------------------------- ```python """XiaoyuZhouFM podcast crawler module.""" ``` -------------------------------------------------------------------------------- /mcp_toolbox/enhance/__init__.py: -------------------------------------------------------------------------------- ```python """LLM Enhancement tools for MCP-Toolbox.""" ``` -------------------------------------------------------------------------------- /mcp_toolbox/file_ops/__init__.py: -------------------------------------------------------------------------------- ```python """File operations tools for MCP-Toolbox.""" ``` -------------------------------------------------------------------------------- /.vscode/settings.json: -------------------------------------------------------------------------------- ```json { "python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python" } ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_image.json: -------------------------------------------------------------------------------- ```json { "err": null, "images": { "2:0": "https://example.com/images/frame1.png", "3:0": "https://example.com/images/text1.png" } } ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_image_fills.json: -------------------------------------------------------------------------------- ```json { "meta": { "images": { "image1": "https://example.com/images/image1.png", "image2": "https://example.com/images/image2.png" } } } ``` -------------------------------------------------------------------------------- /mcp_toolbox/log.py: -------------------------------------------------------------------------------- ```python import os USER_DEFINED_LOG_LEVEL = os.getenv("MCP_TOOLBOX_LOG_LEVEL", "INFO") os.environ["LOGURU_LEVEL"] = USER_DEFINED_LOG_LEVEL from loguru import logger # noqa: E402 __all__ = ["logger"] ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_team_projects.json: -------------------------------------------------------------------------------- ```json { "projects": [ { "created_at": "2023-01-01T00:00:00Z", "id": "project1", "name": "Project 1" }, { "created_at": "2023-01-02T00:00:00Z", "id": "project2", "name": "Project 2" } ] } ``` -------------------------------------------------------------------------------- /codecov.yaml: -------------------------------------------------------------------------------- ```yaml coverage: range: 70..100 round: down precision: 1 status: project: default: target: 90% threshold: 0.5% patch: default: target: auto threshold: 0% informational: true codecov: token: f927bff4-d404-4986-8c11-624eadda8431 ``` -------------------------------------------------------------------------------- /tests/mock/figma/post_comment.json: -------------------------------------------------------------------------------- ```json { "client_meta": { "node_id": "2:0", "x": 300, "y": 400 }, "created_at": "2023-01-04T00:00:00Z", "id": "comment3", "message": "Test comment", "order_id": 3, "resolved_at": null, "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } } ``` -------------------------------------------------------------------------------- /mcp_toolbox/cli.py: -------------------------------------------------------------------------------- ```python import typer from mcp_toolbox.app import mcp app = typer.Typer() @app.command() def stdio(): mcp.run(transport="stdio") @app.command() def sse( host: str = "localhost", port: int = 9871, ): mcp.settings.host = host mcp.settings.port = port mcp.run(transport="sse") if __name__ == "__main__": app(["stdio"]) ``` -------------------------------------------------------------------------------- /.github/workflows/validate-codecov-config.yml: -------------------------------------------------------------------------------- ```yaml name: validate-codecov-config on: pull_request: paths: [codecov.yaml] push: branches: [main] jobs: validate-codecov-config: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 - name: Validate codecov configuration run: curl -sSL --fail-with-body --data-binary @codecov.yaml https://codecov.io/validate ``` -------------------------------------------------------------------------------- /tox.ini: -------------------------------------------------------------------------------- ``` [tox] skipsdist = true envlist = py310, py311, py312, py313 [gh-actions] python = 3.10: py310 3.11: py311 3.12: py312 3.13: py313 [testenv] passenv = PYTHON_VERSION allowlist_externals = uv commands = uv sync --python {envpython} uv run python -m pytest --doctest-modules tests --cov --cov-config=pyproject.toml --cov-report=xml ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_project_files.json: -------------------------------------------------------------------------------- ```json { "files": [ { "key": "file1", "last_modified": "2023-01-01T00:00:00Z", "name": "File 1", "thumbnail_url": "https://example.com/thumbnails/file1.png" }, { "key": "file2", "last_modified": "2023-01-02T00:00:00Z", "name": "File 2", "thumbnail_url": "https://example.com/thumbnails/file2.png" } ] } ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_file_styles.json: -------------------------------------------------------------------------------- ```json { "meta": { "styles": { "style1": { "description": "Primary brand color", "key": "style1", "name": "Primary Color", "remote": false, "style_type": "FILL" }, "style2": { "description": "Main heading style", "key": "style2", "name": "Heading 1", "remote": false, "style_type": "TEXT" } } } } ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Install uv FROM python:3.12-slim # Install tini RUN apt-get update && \ apt-get install -y --no-install-recommends tini && \ rm -rf /var/lib/apt/lists/* COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv # Change the working directory to the `app` directory WORKDIR /app # Copy the lockfile and `pyproject.toml` into the image COPY uv.lock /app/uv.lock COPY pyproject.toml /app/pyproject.toml # Install dependencies RUN uv sync --frozen --no-install-project # Copy the project into the image COPY . /app # Sync the project RUN uv sync --frozen # Run the server ENTRYPOINT ["tini", "--", "uv", "run", "mcp-toolbox"] CMD ["stdio"] ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_style.json: -------------------------------------------------------------------------------- ```json { "containing_file": { "key": "file1", "name": "UI Styles" }, "created_at": "2023-01-01T00:00:00Z", "description": "Primary brand color", "key": "style1", "name": "Primary Color", "sort_position": "1", "style_properties": { "fills": [ { "color": { "a": 1, "b": 0.8, "g": 0.4, "r": 0.2 }, "type": "SOLID" } ] }, "style_type": "FILL", "thumbnail_url": "https://example.com/thumbnails/style1.png", "updated_at": "2023-01-02T00:00:00Z", "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } } ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object required: [] properties: figmaApiKey: type: string default: "" description: Optional API key for Figma integration. commandFunction: # A JS function that produces the CLI command based on the given config to start the MCP on stdio. |- (config) => ({ command: 'uv', args: ['run', '--prerelease=allow', 'mcp-toolbox@latest', 'stdio'], env: { FIGMA_API_KEY: config.figmaApiKey } }) exampleConfig: figmaApiKey: your-figma-api-key ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_file_components.json: -------------------------------------------------------------------------------- ```json { "meta": { "components": { "component1": { "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "description": "Standard button component", "key": "component1", "name": "Button", "remote": false }, "component2": { "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "description": "Standard input field component", "key": "component2", "name": "Input Field", "remote": false } } } } ``` -------------------------------------------------------------------------------- /.github/actions/setup-python-env/action.yml: -------------------------------------------------------------------------------- ```yaml name: "Setup Python Environment" description: "Set up Python environment for the given Python version" inputs: python-version: description: "Python version to use" required: true default: "3.12" uv-version: description: "uv version to use" required: true default: "0.6.2" runs: using: "composite" steps: - uses: actions/setup-python@v5 with: python-version: ${{ inputs.python-version }} - name: Install uv uses: astral-sh/setup-uv@v2 with: version: ${{ inputs.uv-version }} enable-cache: 'true' cache-suffix: ${{ matrix.python-version }} - name: Install Python dependencies run: uv sync --frozen shell: bash ``` -------------------------------------------------------------------------------- /docs/index.md: -------------------------------------------------------------------------------- ```markdown # mcp-toolbox [](https://img.shields.io/github/v/release/ai-zerolab/mcp-toolbox) [](https://github.com/ai-zerolab/mcp-toolbox/actions/workflows/main.yml?query=branch%3Amain) [](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-toolbox) [](https://img.shields.io/github/license/ai-zerolab/mcp-toolbox) Maintenance of a set of tools to enhance LLM through MCP protocols. ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_component.json: -------------------------------------------------------------------------------- ```json { "component_property_definitions": { "size": { "defaultValue": "medium", "type": "VARIANT" }, "variant": { "defaultValue": "primary", "type": "VARIANT" } }, "component_set_id": "component_set1", "containing_file": { "key": "file1", "name": "UI Components" }, "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "created_at": "2023-01-01T00:00:00Z", "description": "Standard button component", "key": "component1", "name": "Button", "thumbnail_url": "https://example.com/thumbnails/component1.png", "updated_at": "2023-01-02T00:00:00Z", "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } } ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_comments.json: -------------------------------------------------------------------------------- ```json { "comments": [ { "client_meta": { "node_id": "2:0", "x": 100, "y": 200 }, "created_at": "2023-01-01T00:00:00Z", "id": "comment1", "message": "This is a comment", "order_id": 1, "resolved_at": null, "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } }, { "client_meta": { "node_id": "3:0", "x": 150, "y": 250 }, "created_at": "2023-01-02T00:00:00Z", "id": "comment2", "message": "Another comment", "order_id": 2, "resolved_at": "2023-01-03T00:00:00Z", "user": { "handle": "user2", "id": "user2", "img_url": "https://example.com/user2.png" } } ] } ``` -------------------------------------------------------------------------------- /mcp_toolbox/app.py: -------------------------------------------------------------------------------- ```python from mcp.server.fastmcp import FastMCP from mcp_toolbox.config import Config from mcp_toolbox.log import logger mcp = FastMCP("mcp-toolbox") config = Config() # Import tools to register them with the MCP server if config.enable_commond_tools: import mcp_toolbox.command_line.tools if config.enable_file_ops_tools: import mcp_toolbox.file_ops.tools if config.enable_audio_tools: try: import mcp_toolbox.audio.tools except ImportError: logger.error( "Audio tools is not available. Please install the required dependencies. e.g. `pip install mcp-toolbox[audio]`" ) if config.enabel_enhance_tools: import mcp_toolbox.enhance.tools if config.figma_api_key: import mcp_toolbox.figma.tools if config.bfl_api_key: import mcp_toolbox.flux.tools import mcp_toolbox.markitdown.tools # noqa: E402 import mcp_toolbox.web.tools # noqa: E402 import mcp_toolbox.xiaoyuzhoufm.tools # noqa: E402, F401 # TODO: Add prompt for toolbox's tools ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_team_styles.json: -------------------------------------------------------------------------------- ```json { "styles": [ { "containing_file": { "key": "file1", "name": "UI Styles" }, "created_at": "2023-01-01T00:00:00Z", "description": "Primary brand color", "key": "style1", "name": "Primary Color", "sort_position": "1", "style_type": "FILL", "thumbnail_url": "https://example.com/thumbnails/style1.png", "updated_at": "2023-01-02T00:00:00Z", "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } }, { "containing_file": { "key": "file1", "name": "UI Styles" }, "created_at": "2023-01-03T00:00:00Z", "description": "Main heading style", "key": "style2", "name": "Heading 1", "sort_position": "2", "style_type": "TEXT", "thumbnail_url": "https://example.com/thumbnails/style2.png", "updated_at": "2023-01-04T00:00:00Z", "user": { "handle": "user2", "id": "user2", "img_url": "https://example.com/user2.png" } } ] } ``` -------------------------------------------------------------------------------- /mcp_toolbox/config.py: -------------------------------------------------------------------------------- ```python import platform from pathlib import Path from pydantic_settings import BaseSettings class Config(BaseSettings): figma_api_key: str | None = None tavily_api_key: str | None = None duckduckgo_api_key: str | None = None bfl_api_key: str | None = None enable_commond_tools: bool = True enable_file_ops_tools: bool = True enable_audio_tools: bool = True enabel_enhance_tools: bool = True tool_home: str = Path("~/.zerolab/mcp-toolbox").expanduser().as_posix() @property def cache_dir(self) -> str: return (Path(self.tool_home) / "cache").expanduser().resolve().absolute().as_posix() @property def memory_file(self) -> str: # Use Documents folder for macOS to enable sync across multiple Mac devices if platform.system() == "Darwin": # macOS documents_path = Path("~/Documents/zerolab/mcp-toolbox").expanduser() documents_path.mkdir(parents=True, exist_ok=True) return (documents_path / "memory").resolve().absolute().as_posix() else: # Default behavior for other operating systems return (Path(self.tool_home) / "memory").expanduser().resolve().absolute().as_posix() if __name__ == "__main__": print(Config()) ``` -------------------------------------------------------------------------------- /generate_config_template.py: -------------------------------------------------------------------------------- ```python import json import shutil import sys from pathlib import Path from mcp_toolbox.config import Config def get_endpoint_path() -> str: """ Find the path to the mcp-toolbox script. Similar to the 'which' command in Unix-like systems. Returns: str: The full path to the mcp-toolbox script """ # First try using shutil.which to find the script in PATH script_path = shutil.which("mcp-toolbox") if script_path: return script_path # If not found in PATH, try to find it in the current Python environment # This handles cases where the script is installed but not in PATH bin_dir = Path(sys.executable).parent possible_paths = [ bin_dir / "mcp-toolbox", bin_dir / "mcp-toolbox.exe", # For Windows ] for path in possible_paths: if path.exists(): return str(path) # If we can't find it, return the script name and hope it's in PATH when executed return "mcp-toolbox" if __name__ == "__main__": endpoint_path = get_endpoint_path() mcp_config = { "command": endpoint_path, "args": ["stdio"], "env": {field.upper(): "" for field in Config.model_fields}, } mcp_item = { "zerolab-toolbox-dev": mcp_config, } print(json.dumps(mcp_item, indent=4)) ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_team_components.json: -------------------------------------------------------------------------------- ```json { "components": [ { "containing_file": { "key": "file1", "name": "UI Components" }, "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "created_at": "2023-01-01T00:00:00Z", "description": "Standard button component", "key": "component1", "name": "Button", "thumbnail_url": "https://example.com/thumbnails/component1.png", "updated_at": "2023-01-02T00:00:00Z", "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } }, { "containing_file": { "key": "file1", "name": "UI Components" }, "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "created_at": "2023-01-03T00:00:00Z", "description": "Standard input field component", "key": "component2", "name": "Input Field", "thumbnail_url": "https://example.com/thumbnails/component2.png", "updated_at": "2023-01-04T00:00:00Z", "user": { "handle": "user2", "id": "user2", "img_url": "https://example.com/user2.png" } } ] } ``` -------------------------------------------------------------------------------- /mkdocs.yml: -------------------------------------------------------------------------------- ```yaml site_name: mcp-toolbox repo_url: https://github.com/ai-zerolab/mcp-toolbox site_url: https://ai-zerolab.github.io/mcp-toolbox site_description: Maintenance of a set of tools to enhance LLM through MCP protocols. site_author: ai-zerolab edit_uri: edit/main/docs/ repo_name: ai-zerolab/mcp-toolbox copyright: Maintained by <a href="https://ai-zerolab.com">ai-zerolab</a>. nav: - Home: index.md - Modules: modules.md plugins: - search - mkdocstrings: handlers: python: paths: ["mcp_toolbox"] theme: name: material feature: tabs: true palette: - media: "(prefers-color-scheme: light)" scheme: default primary: white accent: deep orange toggle: icon: material/brightness-7 name: Switch to dark mode - media: "(prefers-color-scheme: dark)" scheme: slate primary: black accent: deep orange toggle: icon: material/brightness-4 name: Switch to light mode icon: repo: fontawesome/brands/github extra: social: - icon: fontawesome/brands/github link: https://github.com/ai-zerolab/mcp-toolbox - icon: fontawesome/brands/python link: https://pypi.org/project/mcp-toolbox markdown_extensions: - toc: permalink: true - pymdownx.arithmatex: generic: true ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_team_component_sets.json: -------------------------------------------------------------------------------- ```json { "component_sets": [ { "containing_file": { "key": "file1", "name": "UI Components" }, "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "created_at": "2023-01-01T00:00:00Z", "description": "Button component set with variants", "key": "component_set1", "name": "Button", "thumbnail_url": "https://example.com/thumbnails/component_set1.png", "updated_at": "2023-01-02T00:00:00Z", "user": { "handle": "user1", "id": "user1", "img_url": "https://example.com/user1.png" } }, { "containing_file": { "key": "file1", "name": "UI Components" }, "containing_frame": { "name": "Components", "node_id": "4:0", "page_id": "1:0", "page_name": "Page 1" }, "created_at": "2023-01-03T00:00:00Z", "description": "Input field component set with variants", "key": "component_set2", "name": "Input Field", "thumbnail_url": "https://example.com/thumbnails/component_set2.png", "updated_at": "2023-01-04T00:00:00Z", "user": { "handle": "user2", "id": "user2", "img_url": "https://example.com/user2.png" } } ] } ``` -------------------------------------------------------------------------------- /.github/workflows/main.yml: -------------------------------------------------------------------------------- ```yaml name: Main on: push: branches: - main pull_request: types: [opened, synchronize, reopened, ready_for_review] jobs: quality: runs-on: ubuntu-latest steps: - name: Check out uses: actions/checkout@v4 - uses: actions/cache@v4 with: path: ~/.cache/pre-commit key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Run checks run: make check tests-and-type-check: runs-on: ubuntu-latest strategy: matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] fail-fast: false defaults: run: shell: bash steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env with: python-version: ${{ matrix.python-version }} - name: Run tests run: uv run python -m pytest tests --cov --cov-config=pyproject.toml --cov-report=xml - name: Upload coverage reports to Codecov with GitHub Action on Python 3.11 uses: codecov/codecov-action@v4 if: ${{ matrix.python-version == '3.11' }} check-docs: runs-on: ubuntu-latest steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Check if documentation can be built run: uv run mkdocs build -s ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_file.json: -------------------------------------------------------------------------------- ```json { "components": {}, "document": { "children": [ { "children": [ { "absoluteBoundingBox": { "height": 100, "width": 100, "x": 0, "y": 0 }, "background": [ { "blendMode": "NORMAL", "color": { "a": 1, "b": 1, "g": 1, "r": 1 }, "type": "SOLID" } ], "backgroundColor": { "a": 1, "b": 1, "g": 1, "r": 1 }, "blendMode": "PASS_THROUGH", "children": [], "clipsContent": true, "constraints": { "horizontal": "LEFT", "vertical": "TOP" }, "effects": [], "fills": [ { "blendMode": "NORMAL", "color": { "a": 1, "b": 1, "g": 1, "r": 1 }, "type": "SOLID" } ], "id": "2:0", "name": "Frame 1", "strokeAlign": "INSIDE", "strokeWeight": 1, "strokes": [], "type": "FRAME" } ], "id": "1:0", "name": "Page 1", "type": "CANVAS" } ], "id": "0:0", "name": "Document", "type": "DOCUMENT" }, "editorType": "figma", "lastModified": "2023-01-01T00:00:00Z", "name": "Test File", "role": "owner", "schemaVersion": 0, "styles": {}, "thumbnailUrl": "https://example.com/thumbnail.png", "version": "123" } ``` -------------------------------------------------------------------------------- /mcp_toolbox/markitdown/tools.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from typing import Annotated, Any from markitdown import MarkItDown from pydantic import Field from mcp_toolbox.app import mcp md = MarkItDown(enable_builtins=True, enable_plugins=True) @mcp.tool( description="Convert any file to Markdown, using MarkItDown.", ) async def convert_file_to_markdown( input_file: Annotated[str, Field(description="The input Markdown file")], output_file: Annotated[str, Field(description="The output HTML file")], ) -> dict[str, Any]: """Convert any file to Markdown Args: input_file: The input Markdown file output_file: The output HTML file """ input_file: Path = Path(input_file).expanduser().resolve().absolute() output_file: Path = Path(output_file).expanduser().resolve().absolute() if not input_file.is_file(): return { "error": f"Input file not found: {input_file.as_posix()}", "success": False, } output_file.parent.mkdir(parents=True, exist_ok=True) c = md.convert(input_file.as_posix()).text_content output_file.write_text(c) return { "success": True, "input_file": input_file.as_posix(), "output_file": output_file.as_posix(), } @mcp.tool( description="Convert a URL to Markdown, using MarkItDown.", ) async def convert_url_to_markdown( url: Annotated[str, Field(description="The URL to convert")], output_file: Annotated[str, Field(description="The output Markdown file")], ) -> dict[str, Any]: """Convert a URL to Markdown Args: url: The URL to convert output_file: The output Markdown file" """ output_file: Path = Path(output_file).expanduser().resolve().absolute() output_file.parent.mkdir(parents=True, exist_ok=True) c = md.convert_url(url).text_content output_file.write_text(c) return { "success": True, "url": url, "output_file": output_file.as_posix(), } ``` -------------------------------------------------------------------------------- /tests/mock/figma/get_file_nodes.json: -------------------------------------------------------------------------------- ```json { "nodes": { "2:0": { "document": { "absoluteBoundingBox": { "height": 100, "width": 100, "x": 0, "y": 0 }, "background": [ { "blendMode": "NORMAL", "color": { "a": 1, "b": 1, "g": 1, "r": 1 }, "type": "SOLID" } ], "backgroundColor": { "a": 1, "b": 1, "g": 1, "r": 1 }, "blendMode": "PASS_THROUGH", "children": [], "clipsContent": true, "constraints": { "horizontal": "LEFT", "vertical": "TOP" }, "effects": [], "fills": [ { "blendMode": "NORMAL", "color": { "a": 1, "b": 1, "g": 1, "r": 1 }, "type": "SOLID" } ], "id": "2:0", "name": "Frame 1", "strokeAlign": "INSIDE", "strokeWeight": 1, "strokes": [], "type": "FRAME" } }, "3:0": { "document": { "absoluteBoundingBox": { "height": 20, "width": 80, "x": 10, "y": 10 }, "blendMode": "PASS_THROUGH", "characters": "Hello World", "constraints": { "horizontal": "LEFT", "vertical": "TOP" }, "effects": [], "fills": [ { "blendMode": "NORMAL", "color": { "a": 1, "b": 0, "g": 0, "r": 0 }, "type": "SOLID" } ], "id": "3:0", "name": "Text Layer", "strokeAlign": "INSIDE", "strokeWeight": 1, "strokes": [], "style": { "fontFamily": "Roboto", "fontPostScriptName": "Roboto-Regular", "fontSize": 14, "fontWeight": 400, "letterSpacing": 0, "lineHeightPercent": 100, "lineHeightPx": 16.4, "textAlignHorizontal": "LEFT", "textAlignVertical": "TOP" }, "type": "TEXT" } } } } ``` -------------------------------------------------------------------------------- /mcp_toolbox/command_line/tools.py: -------------------------------------------------------------------------------- ```python """Command line execution tools for MCP-Toolbox.""" import asyncio import contextlib import os from pathlib import Path from typing import Annotated, Any from pydantic import Field from mcp_toolbox.app import mcp @mcp.tool(description="Execute a command line instruction.") async def execute_command( command: Annotated[list[str], Field(description="The command to execute as a list of strings")], timeout_seconds: Annotated[int, Field(default=30, description="Maximum execution time in seconds")] = 30, working_dir: Annotated[str | None, Field(default=None, description="Directory to execute the command in")] = None, ) -> dict[str, Any]: """Execute a command line instruction.""" if not command: return { "error": "Command cannot be empty", "stdout": "", "stderr": "", "return_code": 1, } try: # Expand user home directory in working_dir if provided expanded_working_dir = Path(working_dir).expanduser() if working_dir else working_dir # Create subprocess with current environment process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=os.environ, cwd=expanded_working_dir, ) try: # Wait for the process with timeout stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout_seconds) # Decode output stdout_str = stdout.decode("utf-8", errors="replace") if stdout else "" stderr_str = stderr.decode("utf-8", errors="replace") if stderr else "" return { "stdout": stdout_str, "stderr": stderr_str, "return_code": process.returncode, } except asyncio.TimeoutError: # Kill the process if it times out with contextlib.suppress(ProcessLookupError): process.kill() return { "error": f"Command execution timed out after {timeout_seconds} seconds", "stdout": "", "stderr": "", "return_code": 124, # Standard timeout return code } except Exception as e: return { "error": f"Failed to execute command: {e!s}", "stdout": "", "stderr": "", "return_code": 1, } ``` -------------------------------------------------------------------------------- /.github/workflows/on-release-main.yml: -------------------------------------------------------------------------------- ```yaml name: release-main permissions: contents: write packages: write on: release: types: [published] jobs: set-version: runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - name: Export tag id: vars run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT if: ${{ github.event_name == 'release' }} - name: Update project version run: | sed -i "s/^version = \".*\"/version = \"$RELEASE_VERSION\"/" pyproject.toml env: RELEASE_VERSION: ${{ steps.vars.outputs.tag }} if: ${{ github.event_name == 'release' }} - name: Upload updated pyproject.toml uses: actions/upload-artifact@v4 with: name: pyproject-toml path: pyproject.toml publish: runs-on: ubuntu-latest needs: [set-version] steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Download updated pyproject.toml uses: actions/download-artifact@v4 with: name: pyproject-toml - name: Build package run: uv build - name: Publish package run: uv publish env: UV_PUBLISH_TOKEN: ${{ secrets.PYPI_TOKEN }} - name: Upload dists to release uses: svenstaro/upload-release-action@v2 with: repo_token: ${{ secrets.GITHUB_TOKEN }} file: dist/* file_glob: true tag: ${{ github.ref }} overwrite: true push-image: runs-on: ubuntu-latest needs: [set-version] steps: - uses: actions/checkout@v4 - name: Export tag id: vars run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT if: ${{ github.event_name == 'release' }} - name: Set up QEMU uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Login to Github Container Registry uses: docker/login-action@v3 with: registry: ghcr.io username: ai-zerolab password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push image id: docker_build_publish uses: docker/build-push-action@v5 with: context: . platforms: linux/amd64,linux/arm64/v8 cache-from: type=gha cache-to: type=gha,mode=max file: ./Dockerfile push: true tags: | ghcr.io/ai-zerolab/mcp-toolbox:${{ steps.vars.outputs.tag }} ghcr.io/ai-zerolab/mcp-toolbox:latest deploy-docs: needs: publish runs-on: ubuntu-latest steps: - name: Check out uses: actions/checkout@v4 - name: Set up the environment uses: ./.github/actions/setup-python-env - name: Deploy documentation run: uv run mkdocs gh-deploy --force ``` -------------------------------------------------------------------------------- /mcp_toolbox/enhance/tools.py: -------------------------------------------------------------------------------- ```python from typing import Annotated from pydantic import Field from mcp_toolbox.app import mcp from mcp_toolbox.log import logger @mcp.tool( description="Use the tool to think about something. It will not obtain new information or change the database, but just append the thought to the log. Use it when complex reasoning or some cache memory is needed." ) async def think( thought: Annotated[str, Field(description="A thought to think about.")], ) -> dict[str, str]: """ see: https://www.anthropic.com/engineering/claude-think-tool """ return { "thought": thought, } try: from mcp_toolbox.enhance.memory import LocalMemory, get_current_session_memory except ImportError: logger.error( "Memory tools are not available. Please install the required dependencies. e.g. `pip install mcp-toolbox[enhance]`" ) else: @mcp.tool(description="Get the current session id.") def get_session_id() -> dict[str, str]: memory: LocalMemory = get_current_session_memory() return {"session_id": memory.session_id} @mcp.tool(description="Store a memory in the memory database.") def remember( brief: Annotated[str, Field(description="The brief information of the memory.")], detail: Annotated[str, Field(description="The detailed information of the brief text.")], ) -> dict[str, str]: memory: LocalMemory = get_current_session_memory() memory.store(brief, detail) return { "session_id": memory.session_id, "brief": brief, "detail": detail, } @mcp.tool(description="Query a memory from the memory database.") def recall( query: Annotated[str, Field(description="The query to search in the memory database.")], top_k: Annotated[ int, Field( description="The maximum number of results to return. Default to 5.", default=5, ), ] = 5, cross_session: Annotated[ bool, Field( description="Whether to search across all sessions. Default to True.", default=True, ), ] = True, session_id: Annotated[ str | None, Field( description="The session id of the memory. If not provided, the current session id will be used.", default=None, ), ] = None, ) -> list[dict[str, str]]: if session_id: memory = LocalMemory.use_session(session_id) else: memory: LocalMemory = get_current_session_memory() results = memory.query(query, top_k=top_k, cross_session=cross_session) return [r.model_dump(exclude_none=True) for r in results] @mcp.tool(description="Clear all memories in the memory database.") def forget() -> dict[str, str]: memory: LocalMemory = get_current_session_memory() memory.clear() return {"message": "All memories are cleared."} ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-toolbox" version = "0.0.0.dev" description = "Maintenance of a set of tools to enhance LLM through MCP protocols." authors = [{ name = "ai-zerolab", email = "[email protected]" }] readme = "README.md" keywords = ['MCP', "Model Context Protocol", "LLM"] requires-python = ">=3.10" classifiers = [ "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ "anyio>=4.8.0", "duckduckgo-search>=7.5.2", "httpx>=0.28.1", "loguru>=0.7.3", "markitdown[all]~=0.1.0a1", "mcp[cli]>=1.3.0", "numpy>=2.1.3", "pillow>=11.1.0", "pydantic>=2.10.6", "pydantic-settings[toml]>=2.8.0", "tavily-python>=0.5.1", "typer>=0.15.2", ] [project.urls] Homepage = "https://ai-zerolab.github.io/mcp-toolbox/" Repository = "https://github.com/ai-zerolab/mcp-toolbox" Documentation = "https://ai-zerolab.github.io/mcp-toolbox/" [dependency-groups] dev = [ "pytest>=7.2.0", "pre-commit>=2.20.0", "tox-uv>=1.11.3", "deptry>=0.22.0", "pytest-cov>=4.0.0", "ruff>=0.9.2", "mkdocs>=1.4.2", "mkdocs-material>=8.5.10", "mkdocstrings[python]>=0.26.1", "pytest-asyncio>=0.25.3", "mcp-toolbox[all]", ] [project.optional-dependencies] audio = ["openai-whisper>=20240930 ; python_version <= '3.12'"] memory = ["fastembed>=0.6.0", "portalocker>=3.1.1"] all = ["mcp-toolbox[audio, memory]"] [project.scripts] mcp-toolbox = "mcp_toolbox.cli:app" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.setuptools] py-modules = ["mcp_toolbox"] [tool.pytest.ini_options] testpaths = ["tests"] [tool.ruff] target-version = "py310" line-length = 120 fix = true [tool.ruff.lint] select = [ # flake8-2020 "YTT", # flake8-bandit "S", # flake8-bugbear "B", # flake8-builtins "A", # flake8-comprehensions "C4", # flake8-debugger "T10", # flake8-simplify "SIM", # isort "I", # mccabe "C90", # pycodestyle "E", "W", # pyflakes "F", # pygrep-hooks "PGH", # pyupgrade "UP", # ruff "RUF", # tryceratops "TRY", ] ignore = [ # LineTooLong "E501", # DoNotAssignLambda "E731", # raise-vanilla-args "TRY003", # try-consider-else "TRY300", # raise-within-try "TRY301", ] [tool.ruff.lint.per-file-ignores] "tests/*" = ["S101", "C901", "F841", "S108", "F821"] "mcp_toolbox/flux/api.py" = ["C901", "SIM102"] [tool.ruff.format] preview = true [tool.coverage.report] skip_empty = true [tool.coverage.run] branch = true source = ["mcp_toolbox"] omit = ["mcp_toolbox/flux/api.py"] [tool.deptry] exclude = ["mcp_toolbox/app.py", ".venv", "tests"] [tool.deptry.per_rule_ignores] DEP002 = ["mcp", "mcp-toolbox"] ``` -------------------------------------------------------------------------------- /tests/audio/test_audio_tools.py: -------------------------------------------------------------------------------- ```python """Tests for audio tools.""" from unittest.mock import MagicMock, patch import pytest try: from mcp_toolbox.audio.tools import get_audio_length, get_audio_text except ImportError: pytest.skip("Audio tools are not available.", allow_module_level=True) @pytest.fixture def mock_whisper(): """Mock whisper module.""" with patch("mcp_toolbox.audio.tools.whisper") as mock_whisper: # Mock load_audio to return a numpy array of a specific length mock_audio = MagicMock() mock_audio.__len__.return_value = 16000 * 60 # 60 seconds of audio at 16kHz mock_whisper.load_audio.return_value = mock_audio # Mock the model mock_model = MagicMock() mock_model.detect_language.return_value = (None, {"en": 0.9, "zh": 0.1}) mock_model.transcribe.return_value = {"text": "Successfully transcribed audio"} mock_whisper.load_model.return_value = mock_model yield mock_whisper @pytest.fixture def mock_os_path_exists(): """Mock os.path.exists to return True.""" with patch("os.path.exists", return_value=True): yield @pytest.mark.asyncio async def test_get_audio_length(mock_whisper, mock_os_path_exists): """Test get_audio_length function.""" result = await get_audio_length("test.m4a") # Check that the function returns the expected values assert "duration_seconds" in result assert "formatted_duration" in result assert "message" in result assert result["duration_seconds"] == 60.0 assert result["formatted_duration"] == "0:01:00" assert "60.00 seconds" in result["message"] # Check that whisper.load_audio was called with the correct arguments mock_whisper.load_audio.assert_called_once_with("test.m4a") @pytest.mark.asyncio async def test_get_audio_length_file_not_found(): """Test get_audio_length function with a non-existent file.""" with patch("os.path.exists", return_value=False): result = await get_audio_length("nonexistent.m4a") # Check that the function returns an error assert "error" in result assert "message" in result assert "not found" in result["error"] @pytest.mark.asyncio async def test_get_audio_text(mock_whisper, mock_os_path_exists): """Test get_audio_text function.""" # Set up global variables in the module with patch("mcp_toolbox.audio.tools._detected_language", "en"): result = await get_audio_text("test.m4a", 10.0, 20.0, "base") # Check that the function returns the expected values assert "text" in result assert "start_time" in result assert "end_time" in result assert "time_range" in result assert "language" in result assert "message" in result assert result["text"] == "Successfully transcribed audio" assert result["start_time"] == 10.0 assert result["end_time"] == 20.0 assert result["time_range"] == "0:00:10 - 0:00:20" assert "Successfully transcribed audio" in result["message"] # Check that whisper.load_model and transcribe were called mock_whisper.load_model.assert_called() mock_whisper.load_model().transcribe.assert_called() @pytest.mark.asyncio async def test_get_audio_text_file_not_found(): """Test get_audio_text function with a non-existent file.""" with patch("os.path.exists", return_value=False): result = await get_audio_text("nonexistent.m4a", 10.0, 20.0) # Check that the function returns an error assert "error" in result assert "message" in result assert "not found" in result["error"] ``` -------------------------------------------------------------------------------- /mcp_toolbox/flux/tools.py: -------------------------------------------------------------------------------- ```python """Flux API image generation tools.""" from pathlib import Path from typing import Annotated, Any from loguru import logger from pydantic import Field from mcp_toolbox.app import mcp from mcp_toolbox.config import Config from mcp_toolbox.flux.api import ApiException, ImageRequest @mcp.tool(description="Generate an image using the Flux API and save it to a local file.") async def flux_generate_image( prompt: Annotated[str, Field(description="The text prompt for image generation")], output_dir: Annotated[str, Field(description="The directory to save the image")], model_name: Annotated[str, Field(default="flux.1.1-pro", description="The model version to use")] = "flux.1.1-pro", width: Annotated[int | None, Field(default=None, description="Width of the image in pixels")] = None, height: Annotated[int | None, Field(default=None, description="Height of the image in pixels")] = None, seed: Annotated[int | None, Field(default=None, description="Seed for reproducibility")] = None, ) -> dict[str, Any]: """Generate an image using the Flux API and save it to a local file. Args: prompt: The text prompt for image generation output_dir: The directory to save the image model_name: The model version to use (default: flux.1.1-pro) width: Width of the image in pixels (must be a multiple of 32, between 256 and 1440) height: Height of the image in pixels (must be a multiple of 32, between 256 and 1440) seed: Optional seed for reproducibility Returns: A dictionary containing information about the generated image """ config = Config() if not config.bfl_api_key: return { "success": False, "error": "BFL_API_KEY not provided. Set BFL_API_KEY environment variable.", } try: # Create output directory if it doesn't exist output_path = Path(output_dir).expanduser().resolve() output_path.mkdir(parents=True, exist_ok=True) # Generate a filename based on the prompt filename = "_".join(prompt.split()[:5]).lower() filename = "".join(c if c.isalnum() or c == "_" else "_" for c in filename) if len(filename) > 50: filename = filename[:50] # Full path for the image (extension will be added by the save method) image_path = output_path / filename logger.info(f"Generating image with prompt: {prompt}") # Create image request image_request = ImageRequest( prompt=prompt, name=model_name, width=width, height=height, seed=seed, api_key=config.bfl_api_key, validate=True, ) # Request and save the image logger.info("Requesting image from Flux API...") await image_request.request() logger.info("Waiting for image generation to complete...") await image_request.retrieve() logger.info("Saving image to disk...") saved_path = await image_request.save(str(image_path)) # Get the image URL image_url = await image_request.get_url() return { "success": True, "prompt": prompt, "model": model_name, "image_path": saved_path, "image_url": image_url, "message": f"Successfully generated and saved image to {saved_path}", } except ApiException as e: return { "success": False, "error": f"API error: {e}", "message": f"Failed to generate image: {e}", } except ValueError as e: return { "success": False, "error": str(e), "message": f"Invalid parameters: {e}", } except Exception as e: return { "success": False, "error": str(e), "message": f"Failed to generate image: {e}", } ``` -------------------------------------------------------------------------------- /tests/flux/test_flux_tools.py: -------------------------------------------------------------------------------- ```python """Tests for Flux API tools.""" from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcp_toolbox.flux.api import ApiException from mcp_toolbox.flux.tools import flux_generate_image @pytest.fixture def mock_config(): """Mock Config with BFL_API_KEY.""" with patch("mcp_toolbox.flux.tools.Config") as mock_config: config_instance = MagicMock() config_instance.bfl_api_key = "test_api_key" mock_config.return_value = config_instance yield mock_config @pytest.fixture def mock_image_request(): """Mock ImageRequest class.""" with patch("mcp_toolbox.flux.tools.ImageRequest") as mock_class: instance = AsyncMock() instance.request = AsyncMock() instance.retrieve = AsyncMock(return_value={"sample": "https://example.com/image.png"}) instance.get_url = AsyncMock(return_value="https://example.com/image.png") instance.save = AsyncMock(return_value="/path/to/saved/image.png") mock_class.return_value = instance yield mock_class, instance @pytest.mark.asyncio async def test_flux_generate_image_success(mock_config, mock_image_request): """Test successful image generation.""" mock_class, mock_instance = mock_image_request result = await flux_generate_image( prompt="a beautiful landscape", output_dir="/tmp/images", model_name="flux.1.1-pro", width=512, height=512, seed=42, ) # Check that ImageRequest was created with correct parameters mock_class.assert_called_once_with( prompt="a beautiful landscape", name="flux.1.1-pro", width=512, height=512, seed=42, api_key="test_api_key", validate=True, ) # Check that methods were called mock_instance.request.assert_called_once() mock_instance.retrieve.assert_called_once() mock_instance.save.assert_called_once() mock_instance.get_url.assert_called_once() # Check result assert result["success"] is True assert result["prompt"] == "a beautiful landscape" assert result["model"] == "flux.1.1-pro" assert result["image_path"] == "/path/to/saved/image.png" assert result["image_url"] == "https://example.com/image.png" assert "Successfully generated" in result["message"] @pytest.mark.asyncio async def test_flux_generate_image_no_api_key(): """Test image generation with no API key.""" with patch("mcp_toolbox.flux.tools.Config") as mock_config: config_instance = MagicMock() config_instance.bfl_api_key = None mock_config.return_value = config_instance result = await flux_generate_image( prompt="a beautiful landscape", output_dir="/tmp/images", ) assert result["success"] is False assert "BFL_API_KEY not provided" in result["error"] @pytest.mark.asyncio async def test_flux_generate_image_api_exception(mock_config): """Test image generation with API exception.""" with patch("mcp_toolbox.flux.tools.ImageRequest") as mock_class: instance = AsyncMock() instance.request = AsyncMock(side_effect=ApiException(400, "Invalid request")) mock_class.return_value = instance result = await flux_generate_image( prompt="a beautiful landscape", output_dir="/tmp/images", ) assert result["success"] is False assert "API error" in result["error"] @pytest.mark.asyncio async def test_flux_generate_image_value_error(mock_config): """Test image generation with value error.""" with patch("mcp_toolbox.flux.tools.ImageRequest") as mock_class: instance = AsyncMock() instance.request = AsyncMock(side_effect=ValueError("Invalid width")) mock_class.return_value = instance result = await flux_generate_image( prompt="a beautiful landscape", output_dir="/tmp/images", width=123, # Not a multiple of 32 ) assert result["success"] is False assert "Invalid parameters" in result["message"] ``` -------------------------------------------------------------------------------- /tests/markitdown/test_markitdown_tools.py: -------------------------------------------------------------------------------- ```python """Tests for Markitdown tools.""" from unittest.mock import MagicMock, patch import pytest from mcp_toolbox.markitdown.tools import ( convert_file_to_markdown, convert_url_to_markdown, md, ) # Mock for MarkItDown.convert method @pytest.fixture def mock_markitdown_convert(): """Mock for MarkItDown.convert method.""" with patch.object(md, "convert") as mock_convert: # Set up the mock to return a result with text_content mock_result = MagicMock() mock_result.text_content = "# Converted Markdown\n\nThis is converted content." mock_convert.return_value = mock_result yield mock_convert @pytest.fixture def mock_markitdown_convert_url(): """Mock for MarkItDown.convert method.""" with patch.object(md, "convert_url") as mock_convert: # Set up the mock to return a result with text_content mock_result = MagicMock() mock_result.text_content = "# Converted Markdown\n\nThis is converted content." mock_convert.return_value = mock_result yield mock_convert # Test convert_file_to_markdown function @pytest.mark.asyncio async def test_convert_file_to_markdown_success(mock_markitdown_convert): """Test successful file conversion.""" # Mock file operations with ( patch("pathlib.Path.is_file", return_value=True), patch("pathlib.Path.write_text") as mock_write_text, patch("pathlib.Path.mkdir") as mock_mkdir, ): # Call the function result = await convert_file_to_markdown("input.txt", "output.md") # Verify the output file was written with the converted content mock_write_text.assert_called_once_with("# Converted Markdown\n\nThis is converted content.") # Verify the output directory was created mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) # Verify the result is as expected assert result["success"] is True assert "input.txt" in result["input_file"] assert "output.md" in result["output_file"] @pytest.mark.asyncio async def test_convert_file_to_markdown_file_not_found(): """Test file conversion when input file doesn't exist.""" # Mock file operations with patch("pathlib.Path.is_file", return_value=False): # Call the function result = await convert_file_to_markdown("nonexistent.txt", "output.md") # Verify the result is as expected assert result["success"] is False assert "not found" in result["error"] @pytest.mark.asyncio async def test_convert_file_to_markdown_exception(mock_markitdown_convert): """Test file conversion when an exception occurs.""" # Set up the mock to raise an exception mock_markitdown_convert.side_effect = Exception("Conversion error") # Mock file operations with ( patch("pathlib.Path.is_file", return_value=True), patch("pathlib.Path.read_text", return_value="Original content"), patch("pathlib.Path.mkdir"), ): # Call the function and expect an exception with pytest.raises(Exception) as excinfo: await convert_file_to_markdown("input.txt", "output.md") # Verify the exception message assert "Conversion error" in str(excinfo.value) @pytest.mark.asyncio async def test_convert_url_to_markdown_success(mock_markitdown_convert_url): """Test successful file conversion.""" # Mock file operations with ( patch("pathlib.Path.write_text") as mock_write_text, patch("pathlib.Path.mkdir") as mock_mkdir, ): # Call the function result = await convert_url_to_markdown("https://example.com", "output.md") # Verify the output file was written with the converted content mock_write_text.assert_called_once_with("# Converted Markdown\n\nThis is converted content.") # Verify the output directory was created mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) # Verify the result is as expected assert result["success"] is True assert "https://example.com" in result["url"] assert "output.md" in result["output_file"] ``` -------------------------------------------------------------------------------- /mcp_toolbox/web/tools.py: -------------------------------------------------------------------------------- ```python import functools from pathlib import Path from typing import Annotated, Any, Literal import anyio from duckduckgo_search import DDGS from httpx import AsyncClient from pydantic import Field from tavily import AsyncTavilyClient from mcp_toolbox.app import mcp from mcp_toolbox.config import Config client = AsyncClient( follow_redirects=True, ) async def get_http_content( url: Annotated[str, Field(description="The URL to request")], method: Annotated[str, Field(default="GET", description="HTTP method to use")] = "GET", headers: Annotated[dict[str, str] | None, Field(default=None, description="Optional HTTP headers")] = None, params: Annotated[dict[str, str] | None, Field(default=None, description="Optional query parameters")] = None, data: Annotated[dict[str, str] | None, Field(default=None, description="Optional request body data")] = None, timeout: Annotated[int, Field(default=60, description="Request timeout in seconds")] = 60, ) -> str: response = await client.request( method, url, headers=headers, params=params, data=data, timeout=timeout, ) response.raise_for_status() return response.text @mcp.tool( description="Save HTML from a URL.", ) async def save_html( url: Annotated[str, Field(description="The URL to save")], output_path: Annotated[str, Field(description="The path to save the HTML")], ) -> dict[str, Any]: output_path: Path = Path(output_path).expanduser().resolve().absolute() output_path.parent.mkdir(parents=True, exist_ok=True) try: content = await get_http_content(url) except Exception as e: return { "success": False, "error": f"Failed to save HTML: {e!s}", } try: output_path.write_text(content) return { "success": True, "url": url, "output_path": output_path.as_posix(), } except Exception as e: return { "success": False, "error": f"Failed to save HTML: {e!s}", } @mcp.tool( description="Get HTML from a URL.", ) async def get_html(url: Annotated[str, Field(description="The URL to get")]) -> dict[str, Any]: try: content = await get_http_content(url) return { "success": True, "url": url, "content": content, } except Exception as e: return { "success": False, "error": f"Failed to get HTML: {e!s}", } if Config().tavily_api_key: @mcp.tool( description="Search with Tavily.", ) async def search_with_tavily( query: Annotated[str, Field(description="The search query")], search_deep: Annotated[ Literal["basic", "advanced"], Field(default="basic", description="The search depth") ] = "basic", topic: Annotated[Literal["general", "news"], Field(default="general", description="The topic")] = "general", time_range: Annotated[ Literal["day", "week", "month", "year", "d", "w", "m", "y"] | None, Field(default=None, description="The time range"), ] = None, ) -> list[dict[str, Any]]: client = AsyncTavilyClient(Config().tavily_api_key) results = await client.search(query, search_depth=search_deep, topic=topic, time_range=time_range) if not results["results"]: return { "success": False, "error": "No search results found.", } return results["results"] if Config().duckduckgo_api_key: @mcp.tool( description="Search with DuckDuckGo.", ) async def search_with_duckduckgo( query: Annotated[str, Field(description="The search query")], max_results: Annotated[int, Field(default=10, description="The maximum number of results")] = 10, ) -> list[dict[str, Any]]: ddg = DDGS(Config().duckduckgo_api_key) search = functools.partial(ddg.text, max_results=max_results) results = await anyio.to_thread.run_sync(search, query) if len(results) == 0: return { "success": False, "error": "No search results found.", } return results ``` -------------------------------------------------------------------------------- /mcp_toolbox/xiaoyuzhoufm/tools.py: -------------------------------------------------------------------------------- ```python """XiaoyuZhouFM podcast crawler tools.""" import os import re from typing import Annotated, Any import httpx from loguru import logger from pydantic import Field from mcp_toolbox.app import mcp from mcp_toolbox.config import Config class XiaoyuZhouFMCrawler: """XiaoyuZhouFM podcast crawler.""" def __init__(self): """Initialize the crawler.""" self.config = Config() async def extract_audio_url(self, url: str) -> str: """Extract audio URL from XiaoyuZhouFM episode page. Args: url: The XiaoyuZhouFM episode URL Returns: The audio URL Raises: ValueError: If the audio URL cannot be found """ async with httpx.AsyncClient() as client: try: response = await client.get(url) response.raise_for_status() html_content = response.text # Use regex to find the og:audio meta tag pattern = r'<meta\s+property="og:audio"\s+content="([^"]+)"' match = re.search(pattern, html_content) if not match: raise ValueError("Could not find audio URL in the page") audio_url = match.group(1) return audio_url except httpx.HTTPStatusError as e: raise ValueError(f"HTTP error: {e.response.status_code} - {e.response.reason_phrase}") from e except httpx.RequestError as e: raise ValueError(f"Request error: {e}") from e async def download_audio(self, audio_url: str, output_path: str) -> str: """Download audio file from URL. Args: audio_url: The audio file URL output_path: The path to save the audio file Returns: The path to the downloaded file Raises: ValueError: If the download fails """ # Create directory if it doesn't exist output_dir = os.path.dirname(output_path) if output_dir: os.makedirs(output_dir, exist_ok=True) async with httpx.AsyncClient() as client: try: logger.info(f"Downloading audio from {audio_url}") response = await client.get(audio_url) response.raise_for_status() with open(output_path, "wb") as f: f.write(response.content) logger.info(f"Audio saved to {output_path}") return output_path except httpx.HTTPStatusError as e: raise ValueError(f"HTTP error: {e.response.status_code} - {e.response.reason_phrase}") from e except httpx.RequestError as e: raise ValueError(f"Request error: {e}") from e except OSError as e: raise ValueError(f"IO error: {e}") from e # Initialize crawler crawler = XiaoyuZhouFMCrawler() @mcp.tool(description="Crawl and download a podcast episode from XiaoyuZhouFM.") async def xiaoyuzhoufm_download( xiaoyuzhoufm_url: Annotated[str, Field(description="The URL of the XiaoyuZhouFM episode")], output_dir: Annotated[str, Field(description="The directory to save the audio file")], ) -> dict[str, Any]: """Crawl and download a podcast episode from XiaoyuZhouFM. Args: xiaoyuzhoufm_url: The URL of the XiaoyuZhouFM episode output_dir: The directory to save the audio file Returns: A dictionary containing the audio URL and the path to the downloaded file """ try: # Validate URL if not xiaoyuzhoufm_url.startswith("https://www.xiaoyuzhoufm.com/episode/"): raise ValueError("Invalid XiaoyuZhouFM URL. URL should start with 'https://www.xiaoyuzhoufm.com/episode/'") # Extract episode ID from URL episode_id = xiaoyuzhoufm_url.split("/")[-1] if not episode_id: episode_id = "episode" # Extract audio URL audio_url = await crawler.extract_audio_url(xiaoyuzhoufm_url) # Determine file extension from audio URL file_extension = "m4a" if "." in audio_url.split("/")[-1]: file_extension = audio_url.split("/")[-1].split(".")[-1] # Create output path with episode ID as filename output_path = os.path.join(output_dir, f"{episode_id}.{file_extension}") # Download audio downloaded_path = await crawler.download_audio(audio_url, output_path) return { "audio_url": audio_url, "downloaded_path": downloaded_path, "message": f"Successfully downloaded podcast to {downloaded_path}", } except Exception as e: return { "error": str(e), "message": f"Failed to download podcast: {e!s}", } ``` -------------------------------------------------------------------------------- /tests/xiaoyuzhoufm/test_xiaoyuzhoufm_tools.py: -------------------------------------------------------------------------------- ```python """Tests for XiaoyuZhouFM tools.""" import json import os from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcp_toolbox.xiaoyuzhoufm.tools import XiaoyuZhouFMCrawler, xiaoyuzhoufm_download # Helper function to load mock data def load_mock_data(filename): mock_dir = Path(__file__).parent.parent / "mock" / "xiaoyuzhoufm" mock_dir.mkdir(parents=True, exist_ok=True) file_path = mock_dir / filename if not file_path.exists(): # Create empty mock data if it doesn't exist mock_data = {"mock": "data"} with open(file_path, "w") as f: json.dump(mock_data, f) with open(file_path) as f: return json.load(f) # Mock HTML content with audio URL MOCK_HTML_CONTENT = """ <!DOCTYPE html> <html> <head> <meta property="og:audio" content="https://media.example.com/podcasts/episode123.m4a"> <title>Test Episode</title> </head> <body> <h1>Test Episode</h1> </body> </html> """ # Test XiaoyuZhouFMCrawler.extract_audio_url method @pytest.mark.asyncio async def test_extract_audio_url(): # Create a mock response mock_response = MagicMock() mock_response.text = MOCK_HTML_CONTENT mock_response.raise_for_status = MagicMock() # Changed from AsyncMock to MagicMock # Create a mock client mock_client = AsyncMock() mock_client.__aenter__.return_value.get.return_value = mock_response # Patch httpx.AsyncClient with patch("httpx.AsyncClient", return_value=mock_client): crawler = XiaoyuZhouFMCrawler() audio_url = await crawler.extract_audio_url("https://www.xiaoyuzhoufm.com/episode/test") # Assert the audio URL is extracted correctly assert audio_url == "https://media.example.com/podcasts/episode123.m4a" # Test XiaoyuZhouFMCrawler.download_audio method @pytest.mark.asyncio async def test_download_audio(tmp_path): # Create a mock response with binary content mock_response = MagicMock() mock_response.content = b"test audio content" mock_response.raise_for_status = MagicMock() # Changed from AsyncMock to MagicMock # Create a mock client mock_client = AsyncMock() mock_client.__aenter__.return_value.get.return_value = mock_response # Patch httpx.AsyncClient with patch("httpx.AsyncClient", return_value=mock_client): crawler = XiaoyuZhouFMCrawler() output_path = str(tmp_path / "test_audio.m4a") downloaded_path = await crawler.download_audio("https://media.example.com/podcasts/episode123.m4a", output_path) # Assert the file was downloaded correctly assert downloaded_path == output_path assert os.path.exists(output_path) with open(output_path, "rb") as f: content = f.read() assert content == b"test audio content" # Test xiaoyuzhoufm_download tool @pytest.mark.asyncio async def test_xiaoyuzhoufm_download(): # Mock the crawler methods with ( patch("mcp_toolbox.xiaoyuzhoufm.tools.crawler.extract_audio_url") as mock_extract, patch("mcp_toolbox.xiaoyuzhoufm.tools.crawler.download_audio") as mock_download, ): # Set up the mocks mock_extract.return_value = "https://media.example.com/podcasts/episode123.m4a" mock_download.return_value = "/tmp/test/test.m4a" # Call the tool result = await xiaoyuzhoufm_download("https://www.xiaoyuzhoufm.com/episode/test", "/tmp/test") # Assert the result assert result["audio_url"] == "https://media.example.com/podcasts/episode123.m4a" assert result["downloaded_path"] == "/tmp/test/test.m4a" assert "Successfully downloaded" in result["message"] # Verify the mocks were called correctly mock_extract.assert_called_once_with("https://www.xiaoyuzhoufm.com/episode/test") # The output path should be constructed from the output_dir and episode ID mock_download.assert_called_once_with("https://media.example.com/podcasts/episode123.m4a", "/tmp/test/test.m4a") # Test xiaoyuzhoufm_download tool with invalid URL @pytest.mark.asyncio async def test_xiaoyuzhoufm_download_invalid_url(): # Call the tool with an invalid URL result = await xiaoyuzhoufm_download("https://invalid-url.com", "/tmp/test") # Assert the result contains an error assert "error" in result assert "Invalid XiaoyuZhouFM URL" in result["message"] # Test xiaoyuzhoufm_download tool with extraction error @pytest.mark.asyncio async def test_xiaoyuzhoufm_download_extraction_error(): # Mock the crawler methods to raise an error with patch("mcp_toolbox.xiaoyuzhoufm.tools.crawler.extract_audio_url") as mock_extract: # Set up the mock to raise an error mock_extract.side_effect = ValueError("Could not find audio URL") # Call the tool result = await xiaoyuzhoufm_download("https://www.xiaoyuzhoufm.com/episode/test", "/tmp/test") # Assert the result contains an error assert "error" in result assert "Could not find audio URL" in result["error"] assert "Failed to download podcast" in result["message"] ``` -------------------------------------------------------------------------------- /tests/enhance/test_memory.py: -------------------------------------------------------------------------------- ```python import pytest from mcp_toolbox.enhance.memory import LocalMemory, MemoryModel @pytest.fixture def memory_file(tmp_path): return tmp_path / "test-memory" @pytest.fixture def local_memory(memory_file): memory = LocalMemory("test-session", memory_file) # Ensure the file is empty at the start of each test memory.clear() return memory def test_memory_basic(local_memory: LocalMemory): """Test basic memory operations""" assert local_memory.session_id == "test-session" # Store and query memory_model = local_memory.store("test-brief", "test-detail") assert isinstance(memory_model, MemoryModel) assert memory_model.session_id == "test-session" assert memory_model.brief == "test-brief" assert memory_model.detail == "test-detail" assert memory_model.embedding is not None # Query results = local_memory.query("test-brief") assert len(results) == 1 assert results[0].brief == "test-brief" assert results[0].detail == "test-detail" assert results[0].session_id == "test-session" def test_memory_cross_session(memory_file): """Test cross-session memory operations""" # Create two memory instances with different session IDs memory1 = LocalMemory("session-1", memory_file) memory1.clear() # Start with a clean file # Store a memory in session 1 memory1.store("brief-1", "detail-1") # Create a second memory instance with a different session ID memory2 = LocalMemory("session-2", memory_file) # Store a memory in session 2 memory2.store("brief-2", "detail-2") # Refresh memory1 to see both entries memory1.current_memory = memory1._load() # Query with cross_session=True (default) results1 = memory1.query("brief", top_k=5, refresh=True) assert len(results1) == 2, f"Expected 2 results, got {len(results1)}: {results1}" # Query with cross_session=False results2 = memory1.query("brief", top_k=5, cross_session=False) assert len(results2) == 1, f"Expected 1 result, got {len(results2)}: {results2}" assert results2[0].session_id == "session-1" results3 = memory2.query("brief", top_k=5, cross_session=False) assert len(results3) == 1, f"Expected 1 result, got {len(results3)}: {results3}" assert results3[0].session_id == "session-2" def test_memory_clear(memory_file): """Test clearing memory""" # Create a new memory instance memory = LocalMemory("test-session", memory_file) memory.clear() # Start with a clean file # Store some memories memory.store("brief-1", "detail-1") memory.store("brief-2", "detail-2") # Verify memories are stored results = memory.query("brief", top_k=5) assert len(results) == 2, f"Expected 2 results, got {len(results)}: {results}" # Clear memories memory.clear() # Verify memories are cleared results = memory.query("brief", top_k=5) assert len(results) == 0, f"Expected 0 results, got {len(results)}: {results}" def test_memory_empty_file(memory_file): """Test handling of empty memory file""" # Create a new memory instance with a non-existent file memory = LocalMemory("test-session", memory_file) memory.clear() # Start with a clean file # Query should return empty list results = memory.query("test") assert len(results) == 0 # Store should work even with empty file memory.store("test-brief", "test-detail") results = memory.query("test") assert len(results) == 1 def test_memory_top_k(memory_file): """Test top_k parameter in query""" # Create a new memory instance memory = LocalMemory("test-session", memory_file) memory.clear() # Start with a clean file # Store multiple memories with distinct embeddings memory.store("apple", "A fruit") memory.store("banana", "A yellow fruit") memory.store("orange", "A citrus fruit") memory.store("grape", "A small fruit") # Query with different top_k values results1 = memory.query("fruit", top_k=2) assert len(results1) == 2, f"Expected 2 results, got {len(results1)}: {results1}" results2 = memory.query("fruit", top_k=4) assert len(results2) == 4, f"Expected 4 results, got {len(results2)}: {results2}" # Query with top_k larger than available results results3 = memory.query("fruit", top_k=10) assert len(results3) == 4, f"Expected 4 results, got {len(results3)}: {results3}" def test_memory_refresh(memory_file): """Test refresh parameter in query""" # Create two memory instances with the same session ID and file memory1 = LocalMemory("same-session", memory_file) memory1.clear() # Start with a clean file memory2 = LocalMemory("same-session", memory_file) # Store a memory using the first instance memory1.store("test-brief", "test-detail") # Query using the second instance without refresh results1 = memory2.query("test", refresh=False) assert len(results1) == 0, f"Expected 0 results, got {len(results1)}: {results1}" # Query using the second instance with refresh results2 = memory2.query("test", refresh=True) assert len(results2) == 1, f"Expected 1 result, got {len(results2)}: {results2}" ``` -------------------------------------------------------------------------------- /tests/command_line/test_command_line_tools.py: -------------------------------------------------------------------------------- ```python import asyncio from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcp_toolbox.command_line.tools import execute_command # Mock for asyncio.create_subprocess_exec class MockProcess: def __init__(self, stdout=b"", stderr=b"", returncode=0): self.stdout = stdout self.stderr = stderr self.returncode = returncode self.communicate = AsyncMock(return_value=(stdout, stderr)) self.kill = MagicMock() @pytest.mark.asyncio async def test_execute_command_success(): """Test successful command execution.""" # Mock process with successful execution mock_process = MockProcess(stdout=b"test output", stderr=b"", returncode=0) with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: result = await execute_command(["echo", "test"]) # Verify subprocess was called with correct arguments mock_exec.assert_called_once() # Verify the result contains expected fields assert "stdout" in result assert "stderr" in result assert "return_code" in result assert result["stdout"] == "test output" assert result["stderr"] == "" assert result["return_code"] == 0 @pytest.mark.asyncio async def test_execute_command_error(): """Test command execution with error.""" # Mock process with error mock_process = MockProcess(stdout=b"", stderr=b"error message", returncode=1) with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: result = await execute_command(["invalid_command"]) # Verify subprocess was called mock_exec.assert_called_once() # Verify the result contains expected fields assert "stdout" in result assert "stderr" in result assert "return_code" in result assert result["stdout"] == "" assert result["stderr"] == "error message" assert result["return_code"] == 1 @pytest.mark.asyncio async def test_execute_command_timeout(): """Test command execution timeout.""" # Mock process that will time out mock_process = MockProcess() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: result = await execute_command(["sleep", "100"], timeout_seconds=1) # Verify subprocess was called mock_exec.assert_called_once() # Verify process was killed mock_process.kill.assert_called_once() # Verify the result contains expected fields assert "error" in result assert "timed out" in result["error"] assert result["return_code"] == 124 # Timeout return code @pytest.mark.asyncio async def test_execute_command_exception(): """Test exception during command execution.""" with patch("asyncio.create_subprocess_exec", side_effect=Exception("Test exception")) as mock_exec: result = await execute_command(["echo", "test"]) # Verify subprocess was called mock_exec.assert_called_once() # Verify the result contains expected fields assert "error" in result assert "Failed to execute command" in result["error"] assert result["return_code"] == 1 @pytest.mark.asyncio async def test_execute_command_empty(): """Test execution with empty command.""" result = await execute_command([]) # Verify the result contains expected fields assert "error" in result assert "Command cannot be empty" in result["error"] assert result["return_code"] == 1 @pytest.mark.asyncio async def test_execute_command_with_working_dir(): """Test command execution with working directory.""" # Mock process with successful execution mock_process = MockProcess(stdout=b"test output", stderr=b"", returncode=0) test_dir = "/test_dir" # Using a non-tmp directory for testing with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: result = await execute_command(["echo", "test"], working_dir=test_dir) # Verify subprocess was called with correct arguments mock_exec.assert_called_once() _, kwargs = mock_exec.call_args assert kwargs["cwd"] == Path(test_dir) # Verify the result contains expected fields assert result["return_code"] == 0 @pytest.mark.asyncio async def test_execute_command_with_tilde_in_working_dir(): """Test command execution with tilde in working directory.""" # Mock process with successful execution mock_process = MockProcess(stdout=b"test output", stderr=b"", returncode=0) test_dir = "~/test_dir" # Using tilde in path with ( patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec, patch("pathlib.Path.expanduser", return_value=Path("/home/user/test_dir")) as mock_expanduser, ): result = await execute_command(["echo", "test"], working_dir=test_dir) # Verify expanduser was called mock_expanduser.assert_called_once() # Verify subprocess was called with correct arguments mock_exec.assert_called_once() _, kwargs = mock_exec.call_args assert kwargs["cwd"] == Path("/home/user/test_dir") # Verify the result contains expected fields assert result["return_code"] == 0 ``` -------------------------------------------------------------------------------- /mcp_toolbox/enhance/memory.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations import uuid from functools import cache from os import PathLike from pathlib import Path from typing import Annotated import numpy as np import portalocker from fastembed import TextEmbedding from pydantic import BaseModel, Field from mcp_toolbox.config import Config from mcp_toolbox.log import logger embedding_model = TextEmbedding() logger.info("The model BAAI/bge-small-en-v1.5 is ready to use.") def embed_text(text: str) -> list[float]: return next(iter(embedding_model.embed([text]))) class MemoryModel(BaseModel): session_id: Annotated[str, Field(description="The session id of the memory")] brief: Annotated[str, Field(description="The brief information of the memory")] detail: Annotated[str, Field(description="The detailed information of the brief text")] embedding: Annotated[list[float] | None, Field(description="The embedding of the brief text")] = None @cache def get_current_session_memory() -> LocalMemory: return LocalMemory.new_session() class LocalMemory: @classmethod def new_session(cls) -> LocalMemory: return cls.use_session(uuid.uuid4().hex) @classmethod def use_session(cls, session_id: str) -> LocalMemory: config = Config() return cls(session_id, config.memory_file) def __init__(self, session_id: str, memory_file: PathLike): self.session_id = session_id self.memory_file = Path(memory_file) self.memory_file.parent.mkdir(parents=True, exist_ok=True) self.memory_file.touch(exist_ok=True) self.current_memory: np.ndarray = self._load() def _load(self) -> np.ndarray: if not self.memory_file.exists(): return np.empty((0, 4), dtype=object) try: with portalocker.Lock(self.memory_file, "rb") as f: memory = np.load(f, allow_pickle=True) except Exception as e: logger.warning(f"Error loading memory: {e}") memory = np.empty((0, 4), dtype=object) return memory def store(self, brief: str, detail: str) -> MemoryModel: try: # Keep the file locked during the entire operation with portalocker.Lock(self.memory_file, "rb+") as f: try: # Load existing memory current_memory = np.load(f, allow_pickle=True) except (ValueError, EOFError): # File is empty or not a valid numpy array current_memory = np.empty((0, 4), dtype=object) embedding = embed_text(brief) # Append the new entry if current_memory.size == 0: # Initialize with first entry including all 4 fields updated_memory = np.array([[self.session_id, brief, detail, embedding]], dtype=object) else: updated_memory = np.append( current_memory, np.array([[self.session_id, brief, detail, embedding]], dtype=object), axis=0, ) # Save the updated memory f.seek(0) f.truncate() np.save(f, updated_memory) except Exception as e: logger.warning(f"Error storing memory: {e}") raise self.current_memory = self._load() return MemoryModel( session_id=self.session_id, brief=brief, detail=detail, embedding=embedding, ) def query( self, query: str, top_k: int = 3, cross_session: bool = True, refresh: bool = False, ) -> list[MemoryModel]: if refresh: self.current_memory = self._load() embedding = embed_text(query) # Check if memory is empty if self.current_memory.size == 0: return [] # Filter by session if cross_session is False if not cross_session: # Create a mask for entries from the current session session_mask = self.current_memory[:, 0] == self.session_id if not any(session_mask): return [] # No entries for current session # Filter memory to only include current session filtered_memory = self.current_memory[session_mask] # Calculate similarity between query embedding and each stored embedding similarity = np.array([np.dot(stored_embedding, embedding) for stored_embedding in filtered_memory[:, 3]]) top_k_idx = np.argsort(similarity)[-min(top_k, len(similarity)) :] return [ MemoryModel( session_id=filtered_memory[idx, 0], brief=filtered_memory[idx, 1], detail=filtered_memory[idx, 2], ) for idx in top_k_idx ] else: # Calculate similarity between query embedding and each stored embedding similarity = np.array([ np.dot(stored_embedding, embedding) for stored_embedding in self.current_memory[:, 3] ]) top_k_idx = np.argsort(similarity)[-min(top_k, len(similarity)) :] return [ MemoryModel( session_id=self.current_memory[idx, 0], brief=self.current_memory[idx, 1], detail=self.current_memory[idx, 2], ) for idx in top_k_idx ] def clear(self): # Create an empty memory array empty_memory = np.empty((0, 4), dtype=object) # Update the file with the empty array with portalocker.Lock(self.memory_file, "wb") as f: np.save(f, empty_memory) # Update the current memory self.current_memory = empty_memory ``` -------------------------------------------------------------------------------- /tests/enhance/test_enhance_tools.py: -------------------------------------------------------------------------------- ```python from unittest.mock import patch import pytest from mcp_toolbox.enhance.memory import LocalMemory, MemoryModel from mcp_toolbox.enhance.tools import forget, get_session_id, recall, remember, think @pytest.mark.asyncio async def test_think_returns_dict(): """Test that the think function returns a dictionary.""" result = await think("Test thought") assert isinstance(result, dict), "think() should return a dictionary" @pytest.mark.asyncio async def test_think_returns_correct_thought(): """Test that the returned dictionary contains the input thought.""" test_thought = "This is a test thought" result = await think(test_thought) assert result == {"thought": test_thought}, "think() should return a dictionary with the input thought" @pytest.mark.asyncio async def test_think_with_different_thought_types(): """Test think() with various types of thoughts.""" test_cases = [ "Simple string thought", "Thought with special characters: !@#$%^&*()", "Thought with numbers: 12345", "Thought with unicode: こんにちは 世界", "", # Empty string ] for test_thought in test_cases: result = await think(test_thought) assert result == {"thought": test_thought}, f"Failed for thought: {test_thought}" @pytest.fixture def memory_file(tmp_path): return tmp_path / "test-memory" @pytest.fixture def mock_memory(memory_file): memory = LocalMemory("test-session", memory_file) memory.clear() # Start with a clean file return memory @patch("mcp_toolbox.enhance.tools.get_current_session_memory") def test_get_session_id(mock_get_memory, mock_memory): """Test that get_session_id returns the correct session ID.""" mock_get_memory.return_value = mock_memory result = get_session_id() assert result == {"session_id": "test-session"} mock_get_memory.assert_called_once() @patch("mcp_toolbox.enhance.tools.get_current_session_memory") def test_remember(mock_get_memory, mock_memory): """Test that remember stores a memory and returns the correct data.""" mock_get_memory.return_value = mock_memory result = remember("test-brief", "test-detail") assert result == { "session_id": "test-session", "brief": "test-brief", "detail": "test-detail", } mock_get_memory.assert_called_once() # Verify the memory was stored memories = mock_memory.query("test-brief") assert len(memories) == 1 assert memories[0].brief == "test-brief" assert memories[0].detail == "test-detail" @patch("mcp_toolbox.enhance.tools.get_current_session_memory") def test_recall_current_session(mock_get_memory, mock_memory): """Test that recall retrieves memories from the current session.""" mock_get_memory.return_value = mock_memory # Store some memories mock_memory.store("brief-1", "detail-1") mock_memory.store("brief-2", "detail-2") # Recall with default parameters (current session) result = recall("brief") assert len(result) == 2 assert all(isinstance(item, dict) for item in result) assert all("session_id" in item and "brief" in item and "detail" in item for item in result) mock_get_memory.assert_called() @patch("mcp_toolbox.enhance.tools.LocalMemory.use_session") def test_recall_specific_session(mock_use_session, mock_memory): """Test that recall retrieves memories from a specific session.""" mock_use_session.return_value = mock_memory # Store some memories mock_memory.store("brief-1", "detail-1") # Recall with specific session ID result = recall("brief", session_id="specific-session") assert len(result) == 1 mock_use_session.assert_called_once_with("specific-session") @patch("mcp_toolbox.enhance.tools.get_current_session_memory") def test_recall_cross_session(mock_get_memory, mock_memory): """Test that recall retrieves memories across sessions when cross_session is True.""" mock_get_memory.return_value = mock_memory # Mock the query method to simulate cross-session behavior original_query = mock_memory.query def mock_query(query_text, top_k=3, cross_session=True): if cross_session: return [ MemoryModel(session_id="session-1", brief="brief-1", detail="detail-1"), MemoryModel(session_id="session-2", brief="brief-2", detail="detail-2"), ] else: return [MemoryModel(session_id="test-session", brief="brief-1", detail="detail-1")] mock_memory.query = mock_query # Recall with cross_session=True result = recall("brief", cross_session=True) assert len(result) == 2 assert result[0]["session_id"] == "session-1" assert result[1]["session_id"] == "session-2" # Recall with cross_session=False result = recall("brief", cross_session=False) assert len(result) == 1 assert result[0]["session_id"] == "test-session" # Restore original query method mock_memory.query = original_query @patch("mcp_toolbox.enhance.tools.get_current_session_memory") def test_recall_top_k(mock_get_memory, mock_memory): """Test that recall respects the top_k parameter.""" mock_get_memory.return_value = mock_memory # Store multiple memories for i in range(10): mock_memory.store(f"brief-{i}", f"detail-{i}") # Recall with top_k=3 result = recall("brief", top_k=3) assert len(result) <= 3 mock_get_memory.assert_called() @patch("mcp_toolbox.enhance.tools.get_current_session_memory") def test_forget(mock_get_memory, mock_memory): """Test that forget clears all memories.""" mock_get_memory.return_value = mock_memory # Store some memories mock_memory.store("brief-1", "detail-1") mock_memory.store("brief-2", "detail-2") # Verify memories are stored assert len(mock_memory.query("brief")) == 2 # Forget all memories result = forget() assert result == {"message": "All memories are cleared."} mock_get_memory.assert_called() # Verify memories are cleared assert len(mock_memory.query("brief")) == 0 ``` -------------------------------------------------------------------------------- /mcp_toolbox/audio/tools.py: -------------------------------------------------------------------------------- ```python """Audio processing tools for transcription and analysis.""" import datetime import os from pathlib import Path from typing import Annotated, Any import whisper from loguru import logger from pydantic import Field from mcp_toolbox.app import mcp # Global variables to cache model and audio data _model = None _model_name = None _audio = None _audio_path = None _detected_language = None def load_model(model_name="base"): """ Load and cache the Whisper model. Args: model_name: The name of the Whisper model to load (tiny, base, small, medium, large) Returns: The loaded Whisper model """ global _model, _model_name # Load model if not loaded or if model name has changed if _model is None or _model_name != model_name: logger.info(f"Loading Whisper model: {model_name}") _model = whisper.load_model(model_name) _model_name = model_name return _model def load_audio(audio_path, model_name="base"): """ Load and cache the audio file. Args: audio_path: The path to the audio file model_name: The name of the Whisper model to use for language detection Returns: The loaded audio data """ global _audio, _audio_path, _detected_language, _model # Ensure model is loaded model = load_model(model_name) # Only reload if it's a different file or not loaded yet audio_path = Path(audio_path).expanduser().resolve().absolute().as_posix() if _audio is None or _audio_path != audio_path: logger.info(f"Loading audio: {audio_path}") _audio = whisper.load_audio(audio_path) _audio_path = audio_path # Get audio duration in seconds audio_duration = len(_audio) / 16000 # Whisper uses 16kHz audio logger.info(f"Audio duration: {datetime.timedelta(seconds=int(audio_duration))!s}") # Detect language from the first chunk chunk_samples = 30 * 16000 # Use 30 seconds for language detection first_chunk = whisper.pad_or_trim(_audio[:chunk_samples]) mel = whisper.log_mel_spectrogram(first_chunk).to(model.device) _, probs = model.detect_language(mel) _detected_language = max(probs, key=probs.get) logger.info(f"Detected language: {_detected_language}") return _audio @mcp.tool(description="Get the length of an audio file in seconds.") async def get_audio_length( audio_path: Annotated[str, Field(description="The path to the audio file")], ) -> dict[str, Any]: """Get the length of an audio file in seconds. Args: audio_path: The path to the audio file Returns: A dictionary containing the audio length in seconds and formatted time """ try: if not os.path.exists(audio_path): raise ValueError(f"Audio file not found: {audio_path}") # Load audio audio = whisper.load_audio(audio_path) # Calculate duration audio_duration_seconds = len(audio) / 16000 # Whisper uses 16kHz audio formatted_duration = str(datetime.timedelta(seconds=int(audio_duration_seconds))) return { "duration_seconds": audio_duration_seconds, "formatted_duration": formatted_duration, "message": f"Audio length: {formatted_duration} ({audio_duration_seconds:.2f} seconds)", } except Exception as e: return { "error": str(e), "message": f"Failed to get audio length: {e!s}", } @mcp.tool(description="Get transcribed text from a specific time range in an audio file.") async def get_audio_text( audio_path: Annotated[str, Field(description="The path to the audio file")], start_time: Annotated[float, Field(description="Start time in seconds")], end_time: Annotated[float, Field(description="End time in seconds")], model_name: Annotated[ str, Field(default="base", description="Whisper model name: tiny, base, small, medium, large") ] = "base", ) -> dict[str, Any]: """Extract and transcribe text from a specific time range in an audio file. Args: audio_path: The path to the audio file start_time: Start time in seconds end_time: End time in seconds model_name: Whisper model name (tiny, base, small, medium, large) initial_prompt: Initial prompt to guide transcription Returns: A dictionary containing the transcribed text and time range """ try: if not os.path.exists(audio_path): raise ValueError(f"Audio file not found: {audio_path}") # Load audio to detect language if not already loaded _ = load_audio(audio_path, model_name) if _detected_language == "zh": initial_prompt = "以下是普通话的句子" elif _detected_language == "en": initial_prompt = "The following is English speech" else: initial_prompt = "" # Load model and audio (uses cached versions if already loaded) model = load_model(model_name) audio = load_audio(audio_path, model_name) # Convert times to sample indices sample_rate = 16000 # Whisper uses 16kHz audio start_sample = int(start_time * sample_rate) end_sample = int(end_time * sample_rate) # Ensure indices are within bounds audio_length = len(audio) start_sample = max(0, min(start_sample, audio_length - 1)) end_sample = max(start_sample, min(end_sample, audio_length)) # Extract the requested audio segment segment = audio[start_sample:end_sample] # If segment is too short, pad it if len(segment) < 0.5 * sample_rate: # Less than 0.5 seconds logger.warning("Audio segment is very short, results may be poor") segment = whisper.pad_or_trim(segment, 0.5 * sample_rate) # Transcribe the segment result = model.transcribe( segment, language=_detected_language, initial_prompt=initial_prompt, verbose=False, ) # Format time range for display start_formatted = str(datetime.timedelta(seconds=int(start_time))) end_formatted = str(datetime.timedelta(seconds=int(end_time))) # Extract and return the text transcribed_text = result["text"].strip() return { "text": transcribed_text, "start_time": start_time, "end_time": end_time, "time_range": f"{start_formatted} - {end_formatted}", "language": _detected_language, "message": "Successfully transcribed audio", } except Exception as e: return { "error": str(e), "message": f"Failed to transcribe audio: {e!s}", } ``` -------------------------------------------------------------------------------- /mcp_toolbox/flux/api.py: -------------------------------------------------------------------------------- ```python import asyncio import io import os from pathlib import Path import httpx from PIL import Image API_URL = "https://api.bfl.ml" API_ENDPOINTS = { "flux.1-pro": "flux-pro", "flux.1-dev": "flux-dev", "flux.1.1-pro": "flux-pro-1.1", } class ApiException(Exception): def __init__(self, status_code: int, detail: str | list[dict] | None = None): super().__init__() self.detail = detail self.status_code = status_code def __str__(self) -> str: return self.__repr__() def __repr__(self) -> str: if self.detail is None: message = None elif isinstance(self.detail, str): message = self.detail else: message = "[" + ",".join(d["msg"] for d in self.detail) + "]" return f"ApiException({self.status_code=}, {message=}, detail={self.detail})" class ImageRequest: def __init__( self, # api inputs prompt: str, name: str = "flux.1.1-pro", width: int | None = None, height: int | None = None, num_steps: int | None = None, prompt_upsampling: bool | None = None, seed: int | None = None, guidance: float | None = None, interval: float | None = None, safety_tolerance: int | None = None, # behavior of this class validate: bool = True, api_key: str | None = None, ): """ Manages an image generation request to the API. All parameters not specified will use the API defaults. Args: prompt: Text prompt for image generation. width: Width of the generated image in pixels. Must be a multiple of 32. height: Height of the generated image in pixels. Must be a multiple of 32. name: Which model version to use num_steps: Number of steps for the image generation process. prompt_upsampling: Whether to perform upsampling on the prompt. seed: Optional seed for reproducibility. guidance: Guidance scale for image generation. safety_tolerance: Tolerance level for input and output moderation. Between 0 and 6, 0 being most strict, 6 being least strict. validate: Run input validation api_key: Your API key if not provided by the environment Raises: ValueError: For invalid input, when `validate` ApiException: For errors raised from the API """ if validate: if name not in API_ENDPOINTS: raise ValueError(f"Invalid model {name}") elif width is not None and width % 32 != 0: raise ValueError(f"width must be divisible by 32, got {width}") elif width is not None and not (256 <= width <= 1440): raise ValueError(f"width must be between 256 and 1440, got {width}") elif height is not None and height % 32 != 0: raise ValueError(f"height must be divisible by 32, got {height}") elif height is not None and not (256 <= height <= 1440): raise ValueError(f"height must be between 256 and 1440, got {height}") elif num_steps is not None and not (1 <= num_steps <= 50): raise ValueError(f"steps must be between 1 and 50, got {num_steps}") elif guidance is not None and not (1.5 <= guidance <= 5.0): raise ValueError(f"guidance must be between 1.5 and 4, got {guidance}") elif interval is not None and not (1.0 <= interval <= 4.0): raise ValueError(f"interval must be between 1 and 4, got {interval}") elif safety_tolerance is not None and not (0 <= safety_tolerance <= 6.0): raise ValueError(f"safety_tolerance must be between 0 and 6, got {interval}") if name == "flux.1-dev": if interval is not None: raise ValueError("Interval is not supported for flux.1-dev") if name == "flux.1.1-pro": if interval is not None or num_steps is not None or guidance is not None: raise ValueError("Interval, num_steps and guidance are not supported for flux.1.1-pro") self.name = name self.request_json = { "prompt": prompt, "width": width, "height": height, "steps": num_steps, "prompt_upsampling": prompt_upsampling, "seed": seed, "guidance": guidance, "interval": interval, "safety_tolerance": safety_tolerance, } self.request_json = {key: value for key, value in self.request_json.items() if value is not None} self.request_id: str | None = None self.result: dict | None = None self._image_bytes: bytes | None = None self._url: str | None = None if api_key is None: self.api_key = os.environ.get("BFL_API_KEY") else: self.api_key = api_key async def request(self): """ Request to generate the image. """ if self.request_id is not None: return async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/v1/{API_ENDPOINTS[self.name]}", headers={ "accept": "application/json", "x-key": self.api_key, "Content-Type": "application/json", }, json=self.request_json, ) result = response.json() if response.status_code != 200: raise ApiException(status_code=response.status_code, detail=result.get("detail")) self.request_id = response.json()["id"] async def retrieve(self) -> dict: """ Wait for the generation to finish and retrieve response. """ if self.request_id is None: await self.request() while self.result is None: async with httpx.AsyncClient() as client: response = await client.get( f"{API_URL}/v1/get_result", headers={ "accept": "application/json", "x-key": self.api_key, }, params={ "id": self.request_id, }, ) result = response.json() if "status" not in result: raise ApiException(status_code=response.status_code, detail=result.get("detail")) elif result["status"] == "Ready": self.result = result["result"] elif result["status"] == "Pending": await asyncio.sleep(0.5) else: raise ApiException(status_code=200, detail=f"API returned status '{result['status']}'") return self.result async def get_bytes(self) -> bytes: """ Generated image as bytes. """ if self._image_bytes is None: url = await self.get_url() async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code == 200: self._image_bytes = response.content else: raise ApiException(status_code=response.status_code) return self._image_bytes async def get_url(self) -> str: """ Public url to retrieve the image from """ if self._url is None: result = await self.retrieve() self._url = result["sample"] return self._url async def get_image(self) -> Image.Image: """ Load the image as a PIL Image """ bytes_data = await self.get_bytes() return Image.open(io.BytesIO(bytes_data)) async def save(self, path: str) -> str: """ Save the generated image to a local path Args: path: The path to save the image to Returns: The full path where the image was saved """ url = await self.get_url() suffix = Path(url).suffix if not path.endswith(suffix): path = path + suffix Path(path).resolve().parent.mkdir(parents=True, exist_ok=True) bytes_data = await self.get_bytes() with open(path, "wb") as file: file.write(bytes_data) return path ``` -------------------------------------------------------------------------------- /tests/web/test_web_tools.py: -------------------------------------------------------------------------------- ```python """Tests for web tools.""" from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import HTTPStatusError, RequestError, Response from mcp_toolbox.web.tools import ( get_html, get_http_content, save_html, ) # Check if optional dependencies are available try: from mcp_toolbox.web.tools import search_with_tavily TAVILY_AVAILABLE = True except ImportError: TAVILY_AVAILABLE = False try: from mcp_toolbox.web.tools import search_with_duckduckgo DUCKDUCKGO_AVAILABLE = True except ImportError: DUCKDUCKGO_AVAILABLE = False # Mock HTML content for testing MOCK_HTML_CONTENT = """ <!DOCTYPE html> <html> <head> <title>Test Page</title> </head> <body> <h1>Hello World</h1> <p>This is a test page.</p> </body> </html> """ # Helper function to create a mock response def create_mock_response(status_code=200, content=MOCK_HTML_CONTENT): mock_response = MagicMock(spec=Response) mock_response.status_code = status_code mock_response.text = content mock_response.raise_for_status = MagicMock() if status_code >= 400: mock_response.raise_for_status.side_effect = HTTPStatusError( "HTTP Error", request=MagicMock(), response=mock_response ) return mock_response # Test get_http_content function @pytest.mark.asyncio async def test_get_http_content_success(): """Test successful HTTP request.""" # Create a mock response mock_response = create_mock_response() # Create a mock client mock_client = AsyncMock() mock_client.request.return_value = mock_response # Patch the client with patch("mcp_toolbox.web.tools.client", mock_client): # Call the function result = await get_http_content("https://example.com") # Verify the client was called with the correct arguments mock_client.request.assert_called_once_with( "GET", "https://example.com", headers=None, params=None, data=None, timeout=60, ) # Verify the result is as expected assert result == MOCK_HTML_CONTENT @pytest.mark.asyncio async def test_get_http_content_error(): """Test HTTP request with error.""" # Create a mock response with error status mock_response = create_mock_response(status_code=404) # Create a mock client mock_client = AsyncMock() mock_client.request.return_value = mock_response # Patch the client and expect an exception with patch("mcp_toolbox.web.tools.client", mock_client), pytest.raises(HTTPStatusError): await get_http_content("https://example.com") @pytest.mark.asyncio async def test_get_http_content_request_error(): """Test HTTP request with request error.""" # Create a mock client that raises a RequestError mock_client = AsyncMock() mock_client.request.side_effect = RequestError("Connection error", request=MagicMock()) # Patch the client and expect an exception with patch("mcp_toolbox.web.tools.client", mock_client), pytest.raises(RequestError): await get_http_content("https://example.com") # Test save_html tool @pytest.mark.asyncio async def test_save_html_success(): """Test successful saving of HTML.""" # Mock get_http_content to return HTML content with ( patch("mcp_toolbox.web.tools.get_http_content", return_value=MOCK_HTML_CONTENT), patch("pathlib.Path.write_text") as mock_write_text, patch("pathlib.Path.mkdir") as mock_mkdir, ): # Call the function result = await save_html("https://example.com", "/tmp/test.html") # Verify the result is as expected assert result["success"] is True assert result["url"] == "https://example.com" assert "/tmp/test.html" in result["output_path"] # Verify the file operations were called mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) mock_write_text.assert_called_once_with(MOCK_HTML_CONTENT) @pytest.mark.asyncio async def test_save_html_network_error(): """Test saving HTML with network error.""" # Mock get_http_content to raise an exception with patch( "mcp_toolbox.web.tools.get_http_content", side_effect=Exception("Network error"), ): # Call the function result = await save_html("https://example.com", "/tmp/test.html") # Verify the result is as expected assert result["success"] is False assert "error" in result assert "Network error" in result["error"] @pytest.mark.asyncio async def test_save_html_write_error(): """Test saving HTML with file write error.""" # Mock get_http_content to return HTML content # Mock write_text to raise an exception with ( patch("mcp_toolbox.web.tools.get_http_content", return_value=MOCK_HTML_CONTENT), patch("pathlib.Path.write_text", side_effect=Exception("Write error")), patch("pathlib.Path.mkdir"), ): # Call the function result = await save_html("https://example.com", "/tmp/test.html") # Verify the result is as expected assert result["success"] is False assert "error" in result assert "Write error" in result["error"] # Test get_html tool @pytest.mark.asyncio async def test_get_html_success(): """Test successful retrieval of HTML.""" # Mock get_http_content to return HTML content with patch("mcp_toolbox.web.tools.get_http_content", return_value=MOCK_HTML_CONTENT): # Call the function result = await get_html("https://example.com") # Verify the result is as expected assert result["success"] is True assert result["url"] == "https://example.com" assert result["content"] == MOCK_HTML_CONTENT @pytest.mark.asyncio async def test_get_html_error(): """Test retrieval of HTML with error.""" # Mock get_http_content to raise an exception with patch( "mcp_toolbox.web.tools.get_http_content", side_effect=Exception("Network error"), ): # Call the function result = await get_html("https://example.com") # Verify the result is as expected assert result["success"] is False assert "error" in result assert "Network error" in result["error"] # Test search_with_tavily tool if available if TAVILY_AVAILABLE: @pytest.mark.asyncio async def test_search_with_tavily_success(): """Test successful search with Tavily.""" # Mock search results mock_results = [ {"title": "Result 1", "url": "https://example.com/1", "content": "Content 1"}, {"title": "Result 2", "url": "https://example.com/2", "content": "Content 2"}, ] # Mock the Tavily client mock_client = AsyncMock() mock_client.search.return_value = {"results": mock_results} # Patch the AsyncTavilyClient with patch("mcp_toolbox.web.tools.AsyncTavilyClient", return_value=mock_client): # Call the function results = await search_with_tavily("test query") # Verify the client was called with the correct arguments mock_client.search.assert_called_once_with( "test query", search_depth="basic", topic="general", time_range=None ) # Verify the results are as expected assert results == mock_results @pytest.mark.asyncio async def test_search_with_tavily_no_results(): """Test search with Tavily with no results.""" # Mock empty search results mock_results = {"results": []} # Mock the Tavily client mock_client = AsyncMock() mock_client.search.return_value = mock_results # Patch the AsyncTavilyClient with patch("mcp_toolbox.web.tools.AsyncTavilyClient", return_value=mock_client): # Call the function result = await search_with_tavily("test query") # Verify the result is as expected assert result["success"] is False assert "error" in result assert "No search results found" in result["error"] # Test search_with_duckduckgo tool if available if DUCKDUCKGO_AVAILABLE: @pytest.mark.asyncio async def test_search_with_duckduckgo_success(): """Test successful search with DuckDuckGo.""" # Mock search results mock_results = [ {"title": "Result 1", "href": "https://example.com/1", "body": "Content 1"}, {"title": "Result 2", "href": "https://example.com/2", "body": "Content 2"}, ] # Mock the DDGS instance mock_ddgs = MagicMock() mock_ddgs.text.return_value = mock_results # Patch the DDGS class and anyio.to_thread.run_sync with ( patch("mcp_toolbox.web.tools.DDGS", return_value=mock_ddgs), patch("mcp_toolbox.web.tools.anyio.to_thread.run_sync", return_value=mock_results), ): # Call the function results = await search_with_duckduckgo("test query") # Verify the results are as expected assert results == mock_results @pytest.mark.asyncio async def test_search_with_duckduckgo_no_results(): """Test search with DuckDuckGo with no results.""" # Mock empty search results mock_results = [] # Mock the DDGS instance mock_ddgs = MagicMock() mock_ddgs.text.return_value = mock_results # Patch the DDGS class and anyio.to_thread.run_sync with ( patch("mcp_toolbox.web.tools.DDGS", return_value=mock_ddgs), patch("mcp_toolbox.web.tools.anyio.to_thread.run_sync", return_value=mock_results), ): # Call the function result = await search_with_duckduckgo("test query") # Verify the result is as expected assert result["success"] is False assert "error" in result assert "No search results found" in result["error"] ``` -------------------------------------------------------------------------------- /llms.txt: -------------------------------------------------------------------------------- ``` # MCP-Toolbox Development Guide for LLMs This guide is designed to help you (an LLM) effectively contribute to the mcp-toolbox project. It provides essential information about the project structure, development workflow, and best practices. ## Project Overview MCP-Toolbox is a Python package that provides tools for enhancing LLMs through the Model Context Protocol (MCP). The project implements various API integrations as MCP tools, allowing LLMs to interact with external services. ### Key Components - **mcp_toolbox/app.py**: Initializes the FastMCP server - **mcp_toolbox/cli.py**: Command-line interface for running the MCP server - **mcp_toolbox/config.py**: Configuration management using Pydantic - **mcp_toolbox/figma/**: Figma API integration tools - **tests/**: Test files for the project ## Environment Setup Always help the user set up a proper development environment using `uv`. This is the preferred package manager for this project. ### Setting Up the Environment ```bash # Install uv if not already installed curl -LsSf https://astral.sh/uv/install.sh | sh # For macOS/Linux # or powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" # For Windows # Clone the repository (if not already done) git clone https://github.com/username/mcp-toolbox.git cd mcp-toolbox # Create and activate a virtual environment uv venv source .venv/bin/activate # For macOS/Linux # or .venv\Scripts\activate # For Windows # Install the package in development mode uv pip install -e . # Install development dependencies uv pip install -e ".[dev]" ``` ## GitHub Workflow Always follow proper GitHub workflow when making changes: 1. **Create a branch with a descriptive name**: ```bash # Assume the user already has their own fork git checkout -b feature/add-spotify-integration ``` 2. **Make your changes**: Implement the requested features or fixes 3. **Run tests and checks**: ```bash make check # Run linting and formatting make test # Run tests ``` 4. **Commit your changes with descriptive messages**: ```bash git add . git commit -m "feat: add Spotify API integration" ``` 5. **Push your changes**: ```bash git push origin feature/add-spotify-integration ``` 6. **Create a pull request**: Guide the user to create a PR from their branch to the main repository ## Adding New Tools When adding new API integrations or tools, follow this pattern. Here's an example of adding Spotify API integration: ### 1. Update Config Class First, update the `config.py` file to include the new API key: ```python class Config(BaseSettings): figma_api_key: str | None = None spotify_client_id: str | None = None spotify_client_secret: str | None = None cache_dir: str = (HOME / "cache").expanduser().resolve().absolute().as_posix() ``` ### 2. Create Module Structure Create a new module for the integration: ```bash mkdir -p mcp_toolbox/spotify touch mcp_toolbox/spotify/__init__.py touch mcp_toolbox/spotify/tools.py ``` ### 3. Implement API Client and Tools In `mcp_toolbox/spotify/tools.py`: ```python import json from typing import Any, List, Dict, Optional import httpx from pydantic import BaseModel from mcp_toolbox.app import mcp from mcp_toolbox.config import Config class SpotifyApiClient: BASE_URL = "https://api.spotify.com/v1" def __init__(self): self.config = Config() self.access_token = None async def get_access_token(self) -> str: """Get or refresh the Spotify access token.""" if not self.config.spotify_client_id or not self.config.spotify_client_secret: raise ValueError( "Spotify credentials not provided. Set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables." ) auth_url = "https://accounts.spotify.com/api/token" async with httpx.AsyncClient() as client: response = await client.post( auth_url, data={"grant_type": "client_credentials"}, auth=(self.config.spotify_client_id, self.config.spotify_client_secret), ) response.raise_for_status() data = response.json() self.access_token = data["access_token"] return self.access_token async def make_request(self, path: str, method: str = "GET", params: Dict[str, Any] = None) -> Dict[str, Any]: """Make a request to the Spotify API.""" token = await self.get_access_token() async with httpx.AsyncClient() as client: headers = {"Authorization": f"Bearer {token}"} url = f"{self.BASE_URL}{path}" try: if method == "GET": response = await client.get(url, headers=headers, params=params) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: spotify_error = e.response.json() if e.response.content else {"status": e.response.status_code, "error": str(e)} raise ValueError(f"Spotify API error: {spotify_error}") from e except httpx.RequestError as e: raise ValueError(f"Request error: {e!s}") from e # Initialize API client api_client = SpotifyApiClient() # Tool implementations @mcp.tool( description="Search for tracks on Spotify. Args: query (required, The search query), limit (optional, Maximum number of results to return)" ) async def spotify_search_tracks(query: str, limit: int = 10) -> Dict[str, Any]: """Search for tracks on Spotify. Args: query: The search query limit: Maximum number of results to return (default: 10) """ params = {"q": query, "type": "track", "limit": limit} return await api_client.make_request("/search", params=params) @mcp.tool( description="Get details about a specific track. Args: track_id (required, The Spotify ID of the track)" ) async def spotify_get_track(track_id: str) -> Dict[str, Any]: """Get details about a specific track. Args: track_id: The Spotify ID of the track """ return await api_client.make_request(f"/tracks/{track_id}") @mcp.tool( description="Get an artist's top tracks. Args: artist_id (required, The Spotify ID of the artist), market (optional, An ISO 3166-1 alpha-2 country code)" ) async def spotify_get_artist_top_tracks(artist_id: str, market: str = "US") -> Dict[str, Any]: """Get an artist's top tracks. Args: artist_id: The Spotify ID of the artist market: An ISO 3166-1 alpha-2 country code (default: US) """ params = {"market": market} return await api_client.make_request(f"/artists/{artist_id}/top-tracks", params=params) ``` ### 4. Create Tests Create test files for your new tools: ```bash mkdir -p tests/spotify touch tests/spotify/test_tools.py mkdir -p tests/mock/spotify ``` ### 5. Update README Always update the README.md when adding new environment variables or tools: ```markdown ## Environment Variables The following environment variables can be configured: - `FIGMA_API_KEY`: API key for Figma integration - `SPOTIFY_CLIENT_ID`: Client ID for Spotify API - `SPOTIFY_CLIENT_SECRET`: Client Secret for Spotify API ``` ## Error Handling Best Practices When implementing tools, follow these error handling best practices: 1. **Graceful Degradation**: If one API key is missing, other tools should still work ```python async def get_access_token(self) -> str: if not self.config.spotify_client_id or not self.config.spotify_client_secret: raise ValueError( "Spotify credentials not provided. Set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables." ) # Rest of the method... ``` 2. **Descriptive Error Messages**: Provide clear error messages that help users understand what went wrong ```python except httpx.HTTPStatusError as e: spotify_error = e.response.json() if e.response.content else {"status": e.response.status_code, "error": str(e)} raise ValueError(f"Spotify API error: {spotify_error}") from e ``` 3. **Proper Exception Handling**: Catch specific exceptions and handle them appropriately 4. **Fallbacks**: Implement fallback mechanisms when possible ## Testing Always write tests for new functionality: ```python import json from pathlib import Path from unittest.mock import patch import pytest from mcp_toolbox.spotify.tools import ( SpotifyApiClient, spotify_search_tracks, spotify_get_track, spotify_get_artist_top_tracks, ) # Helper function to load mock data def load_mock_data(filename): mock_dir = Path(__file__).parent.parent / "mock" / "spotify" file_path = mock_dir / filename if not file_path.exists(): # Create empty mock data if it doesn't exist mock_data = {"mock": "data"} with open(file_path, "w") as f: json.dump(mock_data, f) with open(file_path) as f: return json.load(f) # Patch the SpotifyApiClient.make_request method @pytest.fixture def mock_make_request(): with patch.object(SpotifyApiClient, "make_request") as mock: def side_effect(path, method="GET", params=None): if path == "/search": return load_mock_data("search_tracks.json") elif path.startswith("/tracks/"): return load_mock_data("get_track.json") elif path.endswith("/top-tracks"): return load_mock_data("get_artist_top_tracks.json") return {"mock": "data"} mock.side_effect = side_effect yield mock # Test spotify_search_tracks function @pytest.mark.asyncio async def test_spotify_search_tracks(mock_make_request): result = await spotify_search_tracks("test query") mock_make_request.assert_called_once() assert mock_make_request.call_args[0][0] == "/search" # Test spotify_get_track function @pytest.mark.asyncio async def test_spotify_get_track(mock_make_request): result = await spotify_get_track("track_id") mock_make_request.assert_called_once() assert mock_make_request.call_args[0][0] == "/tracks/track_id" # Test spotify_get_artist_top_tracks function @pytest.mark.asyncio async def test_spotify_get_artist_top_tracks(mock_make_request): result = await spotify_get_artist_top_tracks("artist_id") mock_make_request.assert_called_once() assert mock_make_request.call_args[0][0] == "/artists/artist_id/top-tracks" ``` ## Documentation When adding new tools, make sure to: 1. Add clear docstrings to all functions and classes 2. Include detailed argument descriptions in the `@mcp.tool` decorator 3. Add type hints to all functions and methods 4. Update the README.md with new environment variables and tools ## Final Checklist Before submitting your changes: 1. ✅ Run `make check` to ensure code quality 2. ✅ Run `make test` to ensure all tests pass 3. ✅ Update documentation if needed 4. ✅ Add new environment variables to README.md 5. ✅ Follow proper Git workflow (branch, commit, push) ``` -------------------------------------------------------------------------------- /mcp_toolbox/figma/tools.py: -------------------------------------------------------------------------------- ```python import json import time from pathlib import Path from typing import Annotated, Any import httpx from pydantic import BaseModel, Field from mcp_toolbox.app import mcp from mcp_toolbox.config import Config # Type definitions for request/response parameters class ClientMeta(BaseModel): x: float y: float node_id: str | None = None node_offset: dict[str, float] | None = None # API Client class FigmaApiClient: BASE_URL = "https://api.figma.com/v1" def __init__(self): self.config = Config() async def get_access_token(self) -> str: if not self.config.figma_api_key: raise ValueError("No Figma API key provided. Set the FIGMA_API_KEY environment variable.") return self.config.figma_api_key async def make_request(self, path: str, method: str = "GET", data: Any = None) -> dict[str, Any]: token = await self.get_access_token() async with httpx.AsyncClient( transport=httpx.AsyncHTTPTransport(retries=3), timeout=30, ) as client: headers = {"X-Figma-Token": token} url = f"{self.BASE_URL}{path}" try: if method == "GET": response = await client.get(url, headers=headers) elif method == "POST": response = await client.post(url, headers=headers, json=data) elif method == "DELETE": response = await client.delete(url, headers=headers) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: figma_error = ( e.response.json() if e.response.content else {"status": e.response.status_code, "err": str(e)} ) raise ValueError( f"Figma API error: {figma_error.get('err', figma_error.get('message', str(e)))}" ) from e except httpx.RequestError as e: raise ValueError(f"Request error: {e!s}") from e def build_query_string(self, params: dict[str, Any]) -> str: # Filter out None values filtered_params = {k: v for k, v in params.items() if v is not None} if not filtered_params: return "" # Convert lists to comma-separated strings for key, value in filtered_params.items(): if isinstance(value, list): filtered_params[key] = ",".join(map(str, value)) # Build query string query_parts = [f"{k}={v}" for k, v in filtered_params.items()] return "?" + "&".join(query_parts) # Cache Manager class CacheManager: def __init__(self): self.config = Config() self.cache_dir = Path(self.config.cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) def save_to_cache(self, filename: str, data: Any) -> str: file_path = self.cache_dir / filename with open(file_path, "w") as f: json.dump(data, f, indent=2) return str(file_path) # Initialize API client and cache manager api_client = FigmaApiClient() cache_manager = CacheManager() # Tool implementations @mcp.tool(description="Get a Figma file by key") async def figma_get_file( file_key: Annotated[str, Field(description="The key of the file to get")], version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None, depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None, branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None, ) -> dict[str, Any]: """Get a Figma file by key.""" params = {"version": version, "depth": depth, "branch_data": branch_data} query_string = api_client.build_query_string(params) result = await api_client.make_request(f"/files/{file_key}{query_string}") # Save to cache try: filename = f"file_{file_key}_{int(time.time() * 1000)}.json" file_path = cache_manager.save_to_cache(filename, result) return { "file_path": file_path, "message": "File data saved to local cache. Use this file path to access the complete data.", } except Exception: # If saving to cache fails, return original result return result @mcp.tool(description="Get specific nodes from a Figma file.") async def figma_get_file_nodes( file_key: Annotated[str, Field(description="The key of the file to get nodes from")], node_ids: Annotated[list[str], Field(description="Array of node IDs to get")], depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None, version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None, ) -> dict[str, Any]: """Get specific nodes from a Figma file.""" params = {"ids": node_ids, "depth": depth, "version": version} query_string = api_client.build_query_string(params) result = await api_client.make_request(f"/files/{file_key}/nodes{query_string}") # Save to cache try: filename = f"file_nodes_{file_key}_{int(time.time() * 1000)}.json" file_path = cache_manager.save_to_cache(filename, result) return { "file_path": file_path, "message": "File nodes data saved to local cache. Use this file path to access the complete data.", } except Exception: # If saving to cache fails, return original result return result @mcp.tool(description="Get images for nodes in a Figma file.") async def figma_get_image( file_key: Annotated[str, Field(description="The key of the file to get images from")], ids: Annotated[list[str], Field(description="Array of node IDs to render")], scale: Annotated[float | None, Field(default=None, description="Scale factor to render at 0.01-4")] = None, format_type: Annotated[str | None, Field(default=None, description="Image format jpg/png/svg/pdf")] = None, svg_include_id: Annotated[bool | None, Field(default=None, description="Include IDs in SVG output")] = None, svg_simplify_stroke: Annotated[ bool | None, Field(default=None, description="Simplify strokes in SVG output") ] = None, use_absolute_bounds: Annotated[bool | None, Field(default=None, description="Use absolute bounds")] = None, ) -> dict[str, Any]: """Get images for nodes in a Figma file.""" params = { "ids": ids, "scale": scale, "format": format_type, "svg_include_id": svg_include_id, "svg_simplify_stroke": svg_simplify_stroke, "use_absolute_bounds": use_absolute_bounds, } query_string = api_client.build_query_string(params) return await api_client.make_request(f"/images/{file_key}{query_string}") @mcp.tool(description="Get URLs for images used in a Figma file.") async def figma_get_image_fills( file_key: Annotated[str, Field(description="The key of the file to get image fills from")], ) -> dict[str, Any]: """Get URLs for images used in a Figma file.""" return await api_client.make_request(f"/files/{file_key}/images") @mcp.tool(description="Get comments on a Figma file.") async def figma_get_comments( file_key: Annotated[str, Field(description="The key of the file to get comments from")], ) -> dict[str, Any]: """Get comments on a Figma file.""" return await api_client.make_request(f"/files/{file_key}/comments") @mcp.tool(description="Post a comment on a Figma file.") async def figma_post_comment( file_key: Annotated[str, Field(description="The key of the file to comment on")], message: Annotated[str, Field(description="Comment message text")], client_meta: Annotated[ dict[str, Any] | None, Field(default=None, description="Position of the comment x/y/node_id/node_offset") ] = None, comment_id: Annotated[str | None, Field(default=None, description="ID of comment to reply to")] = None, ) -> dict[str, Any]: """Post a comment on a Figma file.""" comment_data = {"message": message} if client_meta: comment_data["client_meta"] = client_meta if comment_id: comment_data["comment_id"] = comment_id return await api_client.make_request(f"/files/{file_key}/comments", "POST", comment_data) @mcp.tool(description="Delete a comment from a Figma file.") async def figma_delete_comment( file_key: Annotated[str, Field(description="The key of the file to delete a comment from")], comment_id: Annotated[str, Field(description="ID of the comment to delete")], ) -> dict[str, Any]: """Delete a comment from a Figma file.""" return await api_client.make_request(f"/files/{file_key}/comments/{comment_id}", "DELETE") @mcp.tool(description="Get projects for a team.") async def figma_get_team_projects( team_id: Annotated[str, Field(description="The team ID")], page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, ) -> dict[str, Any]: """Get projects for a team.""" params = {"page_size": page_size, "cursor": cursor} query_string = api_client.build_query_string(params) return await api_client.make_request(f"/teams/{team_id}/projects{query_string}") @mcp.tool(description="Get files for a project.") async def figma_get_project_files( project_id: Annotated[str, Field(description="The project ID")], page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None, ) -> dict[str, Any]: """Get files for a project.""" params = {"page_size": page_size, "cursor": cursor, "branch_data": branch_data} query_string = api_client.build_query_string(params) return await api_client.make_request(f"/projects/{project_id}/files{query_string}") @mcp.tool(description="Get components for a team.") async def figma_get_team_components( team_id: Annotated[str, Field(description="The team ID")], page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, ) -> dict[str, Any]: """Get components for a team.""" params = {"page_size": page_size, "cursor": cursor} query_string = api_client.build_query_string(params) return await api_client.make_request(f"/teams/{team_id}/components{query_string}") @mcp.tool(description="Get components from a file.") async def figma_get_file_components( file_key: Annotated[str, Field(description="The key of the file to get components from")], ) -> dict[str, Any]: """Get components from a file.""" return await api_client.make_request(f"/files/{file_key}/components") @mcp.tool(description="Get a component by key.") async def figma_get_component(key: Annotated[str, Field(description="The component key")]) -> dict[str, Any]: """Get a component by key.""" return await api_client.make_request(f"/components/{key}") @mcp.tool(description="Get component sets for a team.") async def figma_get_team_component_sets( team_id: Annotated[str, Field(description="The team ID")], page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, ) -> dict[str, Any]: """Get component sets for a team.""" params = {"page_size": page_size, "cursor": cursor} query_string = api_client.build_query_string(params) return await api_client.make_request(f"/teams/{team_id}/component_sets{query_string}") @mcp.tool(description="Get styles for a team.") async def figma_get_team_styles( team_id: Annotated[str, Field(description="The team ID")], page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, ) -> dict[str, Any]: """Get styles for a team.""" params = {"page_size": page_size, "cursor": cursor} query_string = api_client.build_query_string(params) return await api_client.make_request(f"/teams/{team_id}/styles{query_string}") @mcp.tool(description="Get styles from a file.") async def figma_get_file_styles( file_key: Annotated[str, Field(description="The key of the file to get styles from")], ) -> dict[str, Any]: """Get styles from a file.""" return await api_client.make_request(f"/files/{file_key}/styles") @mcp.tool(description="Get a style by key.") async def figma_get_style(key: Annotated[str, Field(description="The style key")]) -> dict[str, Any]: """Get a style by key.""" return await api_client.make_request(f"/styles/{key}") ``` -------------------------------------------------------------------------------- /tests/figma/test_figma_tools.py: -------------------------------------------------------------------------------- ```python import json from pathlib import Path from unittest.mock import patch import pytest from mcp_toolbox.figma.tools import ( CacheManager, FigmaApiClient, figma_delete_comment, figma_get_comments, figma_get_component, figma_get_file, figma_get_file_components, figma_get_file_nodes, figma_get_file_styles, figma_get_image, figma_get_image_fills, figma_get_project_files, figma_get_style, figma_get_team_component_sets, figma_get_team_components, figma_get_team_projects, figma_get_team_styles, figma_post_comment, ) # Helper function to load mock data def load_mock_data(filename): mock_dir = Path(__file__).parent.parent / "mock" / "figma" file_path = mock_dir / filename if not file_path.exists(): # Create empty mock data if it doesn't exist mock_data = {"mock": "data"} with open(file_path, "w") as f: json.dump(mock_data, f) with open(file_path) as f: return json.load(f) # Patch the FigmaApiClient.make_request method @pytest.fixture def mock_make_request(): with patch.object(FigmaApiClient, "make_request") as mock: def side_effect(path, method="GET", data=None): # Extract the tool name from the path parts = path.strip("/").split("/") if len(parts) >= 2 and parts[0] == "files" and parts[1]: file_key = parts[1] if len(parts) == 2: # get_file return load_mock_data("get_file.json") elif len(parts) == 3: if parts[2] == "nodes": # get_file_nodes return load_mock_data("get_file_nodes.json") elif parts[2] == "images": # get_image_fills return load_mock_data("get_image_fills.json") elif parts[2] == "components": # get_file_components return load_mock_data("get_file_components.json") elif parts[2] == "styles": # get_file_styles return load_mock_data("get_file_styles.json") elif parts[2] == "comments": if method == "GET": # get_comments return load_mock_data("get_comments.json") elif method == "POST": # post_comment return load_mock_data("post_comment.json") elif len(parts) == 4 and parts[2] == "comments": # delete_comment return load_mock_data("delete_comment.json") elif parts[0] == "images" and len(parts) >= 2: # get_image return load_mock_data("get_image.json") elif parts[0] == "teams" and len(parts) >= 3: team_id = parts[1] if parts[2] == "projects": # get_team_projects return load_mock_data("get_team_projects.json") elif parts[2] == "components": # get_team_components return load_mock_data("get_team_components.json") elif parts[2] == "component_sets": # get_team_component_sets return load_mock_data("get_team_component_sets.json") elif parts[2] == "styles": # get_team_styles return load_mock_data("get_team_styles.json") elif parts[0] == "projects" and len(parts) >= 3: # get_project_files return load_mock_data("get_project_files.json") elif parts[0] == "components" and len(parts) >= 2: # get_component return load_mock_data("get_component.json") elif parts[0] == "styles" and len(parts) >= 2: # get_style return load_mock_data("get_style.json") # Default mock data return {"mock": "data"} mock.side_effect = side_effect yield mock # Patch the CacheManager.save_to_cache method @pytest.fixture def mock_save_to_cache(): with patch.object(CacheManager, "save_to_cache") as mock: mock.return_value = "/mock/path/to/cache/file.json" yield mock # Test get_file function @pytest.mark.asyncio async def test_get_file(mock_make_request, mock_save_to_cache): # Test with minimal parameters result = await figma_get_file("test_file_key") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key") # Verify save_to_cache was called mock_save_to_cache.assert_called_once() # Verify the result contains expected fields assert "file_path" in result assert "message" in result assert result["file_path"] == "/mock/path/to/cache/file.json" # Reset mocks for next test mock_make_request.reset_mock() mock_save_to_cache.reset_mock() # Test with all parameters result = await figma_get_file("test_file_key", version="123", depth=2, branch_data=True) # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key?version=123&depth=2&branch_data=True") # Test get_file_nodes function @pytest.mark.asyncio async def test_get_file_nodes(mock_make_request, mock_save_to_cache): # Test with minimal parameters result = await figma_get_file_nodes("test_file_key", ["node1", "node2"]) # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2") # Verify save_to_cache was called mock_save_to_cache.assert_called_once() # Verify the result contains expected fields assert "file_path" in result assert "message" in result assert result["file_path"] == "/mock/path/to/cache/file.json" # Reset mocks for next test mock_make_request.reset_mock() mock_save_to_cache.reset_mock() # Test with all parameters result = await figma_get_file_nodes("test_file_key", ["node1", "node2"], depth=2, version="123") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2&depth=2&version=123") # Test get_image function @pytest.mark.asyncio async def test_get_image(mock_make_request): # Test with minimal parameters result = await figma_get_image("test_file_key", ["node1", "node2"]) # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/images/test_file_key?ids=node1,node2") # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters result = await figma_get_image( "test_file_key", ["node1", "node2"], scale=2.0, format_type="png", svg_include_id=True, svg_simplify_stroke=True, use_absolute_bounds=True, ) # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with( "/images/test_file_key?ids=node1,node2&scale=2.0&format=png&svg_include_id=True&svg_simplify_stroke=True&use_absolute_bounds=True" ) # Test get_image_fills function @pytest.mark.asyncio async def test_get_image_fills(mock_make_request): result = await figma_get_image_fills("test_file_key") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/images") # Test get_comments function @pytest.mark.asyncio async def test_get_comments(mock_make_request): result = await figma_get_comments("test_file_key") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/comments") # Test post_comment function @pytest.mark.asyncio async def test_post_comment(mock_make_request): # Test with minimal parameters result = await figma_post_comment("test_file_key", "Test comment") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/comments", "POST", {"message": "Test comment"}) # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters client_meta = {"x": 100, "y": 200, "node_id": "node1", "node_offset": {"x": 10, "y": 20}} result = await figma_post_comment("test_file_key", "Test comment", client_meta=client_meta, comment_id="comment1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with( "/files/test_file_key/comments", "POST", {"message": "Test comment", "client_meta": client_meta, "comment_id": "comment1"}, ) # Test delete_comment function @pytest.mark.asyncio async def test_delete_comment(mock_make_request): result = await figma_delete_comment("test_file_key", "comment1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/comments/comment1", "DELETE") # Test get_team_projects function @pytest.mark.asyncio async def test_get_team_projects(mock_make_request): # Test with minimal parameters result = await figma_get_team_projects("team1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/projects") # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters result = await figma_get_team_projects("team1", page_size=10, cursor="cursor1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/projects?page_size=10&cursor=cursor1") # Test get_project_files function @pytest.mark.asyncio async def test_get_project_files(mock_make_request): # Test with minimal parameters result = await figma_get_project_files("project1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/projects/project1/files") # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters result = await figma_get_project_files("project1", page_size=10, cursor="cursor1", branch_data=True) # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/projects/project1/files?page_size=10&cursor=cursor1&branch_data=True") # Test get_team_components function @pytest.mark.asyncio async def test_get_team_components(mock_make_request): # Test with minimal parameters result = await figma_get_team_components("team1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/components") # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters result = await figma_get_team_components("team1", page_size=10, cursor="cursor1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/components?page_size=10&cursor=cursor1") # Test get_file_components function @pytest.mark.asyncio async def test_get_file_components(mock_make_request): result = await figma_get_file_components("test_file_key") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/components") # Test get_component function @pytest.mark.asyncio async def test_get_component(mock_make_request): result = await figma_get_component("component1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/components/component1") # Test get_team_component_sets function @pytest.mark.asyncio async def test_get_team_component_sets(mock_make_request): # Test with minimal parameters result = await figma_get_team_component_sets("team1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/component_sets") # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters result = await figma_get_team_component_sets("team1", page_size=10, cursor="cursor1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/component_sets?page_size=10&cursor=cursor1") # Test get_team_styles function @pytest.mark.asyncio async def test_get_team_styles(mock_make_request): # Test with minimal parameters result = await figma_get_team_styles("team1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/styles") # Reset mock for next test mock_make_request.reset_mock() # Test with all parameters result = await figma_get_team_styles("team1", page_size=10, cursor="cursor1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/teams/team1/styles?page_size=10&cursor=cursor1") # Test get_file_styles function @pytest.mark.asyncio async def test_get_file_styles(mock_make_request): result = await figma_get_file_styles("test_file_key") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/files/test_file_key/styles") # Test get_style function @pytest.mark.asyncio async def test_get_style(mock_make_request): result = await figma_get_style("style1") # Verify make_request was called with correct parameters mock_make_request.assert_called_once_with("/styles/style1") ``` -------------------------------------------------------------------------------- /mcp_toolbox/file_ops/tools.py: -------------------------------------------------------------------------------- ```python """File operations tools for MCP-Toolbox.""" import re import stat from datetime import datetime from pathlib import Path from typing import Annotated, Any from pydantic import Field from mcp_toolbox.app import mcp @mcp.tool(description="Read file content.") async def read_file_content( path: Annotated[str, Field(description="Path to the file to read")], encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8", chunk_size: Annotated[ int, Field(default=1000000, description="Size of each chunk in bytes, default: 1MB"), ] = 1000000, chunk_index: Annotated[int, Field(default=0, description="Index of the chunk to retrieve, 0-based")] = 0, ) -> dict[str, Any]: """Read content from a file, with support for chunked reading for large files. Args: path: Path to the file to read encoding: Optional. File encoding (default: utf-8) chunk_size: Optional. Size of each chunk in bytes (default: 1000000, which about 1MB) chunk_index: Optional. Index of the chunk to retrieve, 0-based (default: 0) Returns: Dictionary containing content and metadata, including chunking information """ try: file_path = Path(path).expanduser() if not file_path.exists(): return { "error": f"File not found: {path}", "content": "", "success": False, } if not file_path.is_file(): return { "error": f"Path is not a file: {path}", "content": "", "success": False, } # Get file stats stats = file_path.stat() file_size = stats.st_size # Calculate total chunks total_chunks = (file_size + chunk_size - 1) // chunk_size # Ceiling division # Validate chunk_index if chunk_index < 0 or (file_size > 0 and chunk_index >= total_chunks): return { "error": f"Invalid chunk index: {chunk_index}. Valid range is 0 to {total_chunks - 1}", "content": "", "success": False, "total_chunks": total_chunks, "file_size": file_size, } # Calculate start and end positions for the chunk start_pos = chunk_index * chunk_size end_pos = min(start_pos + chunk_size, file_size) chunk_actual_size = end_pos - start_pos # Read the specified chunk content = "" with open(file_path, "rb") as f: f.seek(start_pos) chunk_bytes = f.read(chunk_actual_size) try: # Try to decode as text content = chunk_bytes.decode(encoding, errors="replace") except UnicodeDecodeError: # If decoding fails, return base64 encoded binary data import base64 content = base64.b64encode(chunk_bytes).decode("ascii") encoding = f"base64 (original: {encoding})" return { "content": content, "size": file_size, "chunk_size": chunk_size, "chunk_index": chunk_index, "chunk_actual_size": chunk_actual_size, "total_chunks": total_chunks, "is_last_chunk": chunk_index == total_chunks - 1, "encoding": encoding, "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), "success": True, } except UnicodeDecodeError: return { "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.", "content": "", "success": False, } except Exception as e: return { "error": f"Failed to read file: {e!s}", "content": "", "success": False, } @mcp.tool(description="Write content to a file.") async def write_file_content( path: Annotated[str, Field(description="Path to the file to write")], content: Annotated[str, Field(description="Content to write")], encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8", append: Annotated[bool, Field(default=False, description="Whether to append to the file")] = False, ) -> dict[str, Any]: """Write content to a file. Args: path: Path to the file to write content: Content to write to the file encoding: Optional. File encoding (default: utf-8) append: Optional. Whether to append to the file (default: False) Returns: Dictionary containing success status and metadata """ try: file_path = Path(path).expanduser() # Create parent directories if they don't exist file_path.parent.mkdir(parents=True, exist_ok=True) # Write content to file mode = "a" if append else "w" with open(file_path, mode, encoding=encoding) as f: f.write(content) # Get file stats stats = file_path.stat() return { "path": str(file_path), "size": stats.st_size, "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), "success": True, } except Exception as e: return { "error": f"Failed to write file: {e!s}", "path": path, "success": False, } @mcp.tool(description="Replace content in a file using regular expressions.") async def replace_in_file( path: Annotated[str, Field(description="Path to the file")], pattern: Annotated[ str, Field( description="Python regular expression pattern (re module). Supports groups, character classes, quantifiers, etc. Examples: '[a-z]+' for lowercase words, '\\d{3}-\\d{4}' for number patterns. Remember to escape backslashes." ), ], replacement: Annotated[str, Field(description="Replacement string")], encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8", count: Annotated[int, Field(default=0, description="Maximum number of replacements")] = 0, ) -> dict[str, Any]: """Replace content in a file using regular expressions. Args: path: Path to the file pattern: Regular expression pattern replacement: Replacement string encoding: Optional. File encoding (default: utf-8) count: Optional. Maximum number of replacements (default: 0, which means all occurrences) Returns: Dictionary containing success status and replacement information """ try: file_path = Path(path).expanduser() if not file_path.exists(): return { "error": f"File not found: {path}", "success": False, "replacements": 0, } if not file_path.is_file(): return { "error": f"Path is not a file: {path}", "success": False, "replacements": 0, } # Read file content with open(file_path, encoding=encoding) as f: content = f.read() # Compile regex pattern try: regex = re.compile(pattern) except re.error as e: return { "error": f"Invalid regular expression: {e!s}", "success": False, "replacements": 0, } # Replace content new_content, replacements = regex.subn(replacement, content, count=count) if replacements > 0: # Write updated content back to file with open(file_path, "w", encoding=encoding) as f: f.write(new_content) return { "path": str(file_path), "replacements": replacements, "success": True, } except UnicodeDecodeError: return { "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.", "success": False, "replacements": 0, } except Exception as e: return { "error": f"Failed to replace content: {e!s}", "success": False, "replacements": 0, } def _format_mode(mode: int) -> str: """Format file mode into a string representation. Args: mode: File mode as an integer Returns: String representation of file permissions """ result = "" # File type if stat.S_ISDIR(mode): result += "d" elif stat.S_ISLNK(mode): result += "l" else: result += "-" # User permissions result += "r" if mode & stat.S_IRUSR else "-" result += "w" if mode & stat.S_IWUSR else "-" result += "x" if mode & stat.S_IXUSR else "-" # Group permissions result += "r" if mode & stat.S_IRGRP else "-" result += "w" if mode & stat.S_IWGRP else "-" result += "x" if mode & stat.S_IXGRP else "-" # Other permissions result += "r" if mode & stat.S_IROTH else "-" result += "w" if mode & stat.S_IWOTH else "-" result += "x" if mode & stat.S_IXOTH else "-" return result def _get_file_info(path: Path) -> dict[str, Any]: """Get detailed information about a file or directory. Args: path: Path to the file or directory Returns: Dictionary containing file information """ stats = path.stat() # Format timestamps mtime = datetime.fromtimestamp(stats.st_mtime).isoformat() ctime = datetime.fromtimestamp(stats.st_ctime).isoformat() atime = datetime.fromtimestamp(stats.st_atime).isoformat() # Get file type if path.is_dir(): file_type = "directory" elif path.is_symlink(): file_type = "symlink" else: file_type = "file" # Format size size = stats.st_size size_str = f"{size} bytes" if size >= 1024: size_str = f"{size / 1024:.2f} KB" if size >= 1024 * 1024: size_str = f"{size / (1024 * 1024):.2f} MB" if size >= 1024 * 1024 * 1024: size_str = f"{size / (1024 * 1024 * 1024):.2f} GB" return { "name": path.name, "path": str(path), "type": file_type, "size": size, "size_formatted": size_str, "permissions": _format_mode(stats.st_mode), "mode": stats.st_mode, "owner": stats.st_uid, "group": stats.st_gid, "created": ctime, "modified": mtime, "accessed": atime, } @mcp.tool(description="List directory contents with detailed information.") async def list_directory( # noqa: C901 path: Annotated[str, Field(description="Directory path")], recursive: Annotated[bool, Field(default=False, description="Whether to list recursively")] = False, max_depth: Annotated[int, Field(default=-1, description="Maximum recursion depth")] = -1, include_hidden: Annotated[bool, Field(default=False, description="Whether to include hidden files")] = False, ignore_patterns: Annotated[ list[str] | None, Field( default=[ "node_modules", "dist", "build", "public", "static", ".next", ".git", ".vscode", ".idea", ".DS_Store", ".env", ".venv", ], description="Glob patterns to ignore (e.g. ['node_modules', '*.tmp'])", ), ] = None, ) -> dict[str, Any]: """List directory contents with detailed information. Args: path: Directory path recursive: Optional. Whether to list recursively (default: False) max_depth: Optional. Maximum recursion depth (default: -1, which means no limit) include_hidden: Optional. Whether to include hidden files (default: False) ignore_patterns: Optional. Glob patterns to ignore (default: ['node_modules', 'dist', 'build', 'public', 'static', '.next', '.git', '.vscode', '.idea', '.DS_Store', '.env', '.venv']) Returns: Dictionary containing directory contents and metadata """ ignore_patterns = ( ignore_patterns if ignore_patterns is not None else [ "node_modules", "dist", "build", "public", "static", ".next", ".git", ".vscode", ".idea", ".DS_Store", ".env", ".venv", ] ) try: dir_path = Path(path).expanduser() if not dir_path.exists(): return { "error": f"Directory not found: {path}", "entries": [], "success": False, } if not dir_path.is_dir(): return { "error": f"Path is not a directory: {path}", "entries": [], "success": False, } entries = [] # Import fnmatch for pattern matching import fnmatch def should_ignore(path: Path) -> bool: """Check if a path should be ignored based on ignore patterns. Args: path: Path to check Returns: True if the path should be ignored, False otherwise """ if not ignore_patterns: return False return any(fnmatch.fnmatch(path.name, pattern) for pattern in ignore_patterns) def process_directory(current_path: Path, current_depth: int = 0) -> None: """Process a directory and its contents recursively. Args: current_path: Path to the current directory current_depth: Current recursion depth """ nonlocal entries # Check if we've reached the maximum depth if max_depth >= 0 and current_depth > max_depth: return try: # List directory contents for item in current_path.iterdir(): # Skip hidden files if not included if not include_hidden and item.name.startswith("."): continue # Skip ignored patterns if should_ignore(item): continue # Get file information file_info = _get_file_info(item) file_info["depth"] = current_depth entries.append(file_info) # Recursively process subdirectories if recursive and item.is_dir(): process_directory(item, current_depth + 1) except PermissionError: # Add an entry indicating permission denied entries.append({ "name": current_path.name, "path": str(current_path), "type": "directory", "error": "Permission denied", "depth": current_depth, }) # Start processing from the root directory process_directory(dir_path) return { "path": str(dir_path), "entries": entries, "count": len(entries), "success": True, } except Exception as e: return { "error": f"Failed to list directory: {e!s}", "entries": [], "success": False, } ```