#
tokens: 46255/50000 70/71 files (page 1/2)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 2. Use http://codebase.md/ai-zerolab/mcp-toolbox?page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── actions
│   │   └── setup-python-env
│   │       └── action.yml
│   └── workflows
│       ├── main.yml
│       ├── on-release-main.yml
│       └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .vscode
│   └── settings.json
├── codecov.yaml
├── CONTRIBUTING.md
├── Dockerfile
├── docs
│   ├── index.md
│   └── modules.md
├── generate_config_template.py
├── LICENSE
├── llms.txt
├── Makefile
├── mcp_toolbox
│   ├── __init__.py
│   ├── app.py
│   ├── audio
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── cli.py
│   ├── command_line
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── config.py
│   ├── enhance
│   │   ├── __init__.py
│   │   ├── memory.py
│   │   └── tools.py
│   ├── figma
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── file_ops
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── flux
│   │   ├── __init__.py
│   │   ├── api.py
│   │   └── tools.py
│   ├── log.py
│   ├── markitdown
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── web
│   │   ├── __init__.py
│   │   └── tools.py
│   └── xiaoyuzhoufm
│       ├── __init__.py
│       └── tools.py
├── mkdocs.yml
├── pyproject.toml
├── pytest.ini
├── README.md
├── smithery.yaml
├── tests
│   ├── audio
│   │   └── test_audio_tools.py
│   ├── command_line
│   │   └── test_command_line_tools.py
│   ├── enhance
│   │   ├── test_enhance_tools.py
│   │   └── test_memory.py
│   ├── figma
│   │   └── test_figma_tools.py
│   ├── file_ops
│   │   └── test_file_ops_tools.py
│   ├── flux
│   │   └── test_flux_tools.py
│   ├── markitdown
│   │   └── test_markitdown_tools.py
│   ├── mock
│   │   └── figma
│   │       ├── delete_comment.json
│   │       ├── get_comments.json
│   │       ├── get_component.json
│   │       ├── get_file_components.json
│   │       ├── get_file_nodes.json
│   │       ├── get_file_styles.json
│   │       ├── get_file.json
│   │       ├── get_image_fills.json
│   │       ├── get_image.json
│   │       ├── get_project_files.json
│   │       ├── get_style.json
│   │       ├── get_team_component_sets.json
│   │       ├── get_team_components.json
│   │       ├── get_team_projects.json
│   │       ├── get_team_styles.json
│   │       └── post_comment.json
│   ├── web
│   │   └── test_web_tools.py
│   └── xiaoyuzhoufm
│       └── test_xiaoyuzhoufm_tools.py
├── tox.ini
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: "v5.0.0"
    hooks:
      - id: check-case-conflict
      - id: check-merge-conflict
      - id: check-toml
      - id: check-yaml
      - id: check-json
        exclude: ^.devcontainer/devcontainer.json
      - id: pretty-format-json
        exclude: ^.devcontainer/devcontainer.json
        args: [--autofix]
      - id: end-of-file-fixer
      - id: trailing-whitespace

  - repo: https://github.com/executablebooks/mdformat
    rev: 0.7.22
    hooks:
      - id: mdformat
        additional_dependencies:
          [mdformat-gfm, mdformat-frontmatter, mdformat-footnote]

  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: "v0.11.8"
    hooks:
      - id: ruff
        args: [--exit-non-zero-on-fix]
      - id: ruff-format

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
docs/source

# From https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# Vscode config files
# .vscode/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# mcp-toolbox

[![Release](https://img.shields.io/github/v/release/ai-zerolab/mcp-toolbox)](https://img.shields.io/github/v/release/ai-zerolab/mcp-toolbox)
[![Build status](https://img.shields.io/github/actions/workflow/status/ai-zerolab/mcp-toolbox/main.yml?branch=main)](https://github.com/ai-zerolab/mcp-toolbox/actions/workflows/main.yml?query=branch%3Amain)
[![codecov](https://codecov.io/gh/ai-zerolab/mcp-toolbox/branch/main/graph/badge.svg)](https://codecov.io/gh/ai-zerolab/mcp-toolbox)
[![Commit activity](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-toolbox)](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-toolbox)
[![License](https://img.shields.io/github/license/ai-zerolab/mcp-toolbox)](https://img.shields.io/github/license/ai-zerolab/mcp-toolbox)

A comprehensive toolkit for enhancing LLM capabilities through the Model Context Protocol (MCP). This package provides a collection of tools that allow LLMs to interact with external services and APIs, extending their functionality beyond text generation.

- **GitHub repository**: <https://github.com/ai-zerolab/mcp-toolbox/>
- (WIP)**Documentation**: <https://ai-zerolab.github.io/mcp-toolbox/>

## Features

> \*nix is our main target, but Windows should work too.

- **Command Line Execution**: Execute any command line instruction through LLM
- **Figma Integration**: Access Figma files, components, styles, and more
- **Extensible Architecture**: Easily add new API integrations
- **MCP Protocol Support**: Compatible with Claude Desktop and other MCP-enabled LLMs
- **Comprehensive Testing**: Well-tested codebase with high test coverage

## Installation

### Using uv (Recommended)

We recommend using [uv](https://github.com/astral-sh/uv) to manage your environment.

```bash
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh  # For macOS/Linux
# or
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"  # For Windows
```

Then you can use `uvx "mcp-toolbox@latest" stdio` as commands for running the MCP server for latest version. **Audio and memory tools are not included in the default installation.**, you can include them by installing the `all` extra:

> [audio] for audio tools, [memory] for memory tools, [all] for all tools

```bash
uvx "mcp-toolbox[all]@latest" stdio
```

### Installing via Smithery

To install Toolbox for LLM Enhancement for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ai-zerolab/mcp-toolbox):

```bash
npx -y @smithery/cli install @ai-zerolab/mcp-toolbox --client claude
```

### Using pip

```bash
pip install "mcp-toolbox[all]"
```

And you can use `mcp-toolbox stdio` as commands for running the MCP server.

## Configuration

### Environment Variables

The following environment variables can be configured:

- `FIGMA_API_KEY`: API key for Figma integration
- `TAVILY_API_KEY`: API key for Tavily integration
- `DUCKDUCKGO_API_KEY`: API key for DuckDuckGo integration
- `BFL_API_KEY`: API key for Flux image generation API

### Memory Storage

Memory tools store data in the following locations:

- **macOS**: `~/Documents/zerolab/mcp-toolbox/memory` (syncs across devices via iCloud)
- **Other platforms**: `~/.zerolab/mcp-toolbox/memory`

### Full Configuration

To use mcp-toolbox with Claude Desktop/Cline/Cursor/..., add the following to your configuration file:

```json
{
  "mcpServers": {
    "zerolab-toolbox": {
      "command": "uvx",
      "args": ["--prerelease=allow", "mcp-toolbox@latest", "stdio"],
      "env": {
        "FIGMA_API_KEY": "your-figma-api-key",
        "TAVILY_API_KEY": "your-tavily-api-key",
        "DUCKDUCKGO_API_KEY": "your-duckduckgo-api-key",
        "BFL_API_KEY": "your-bfl-api-key"
      }
    }
  }
}
```

For full features:

```json
{
  "mcpServers": {
    "zerolab-toolbox": {
      "command": "uvx",
      "args": [
        "--prerelease=allow",
        "--python=3.12",
        "mcp-toolbox[all]@latest",
        "stdio"
      ],
      "env": {
        "FIGMA_API_KEY": "your-figma-api-key",
        "TAVILY_API_KEY": "your-tavily-api-key",
        "DUCKDUCKGO_API_KEY": "your-duckduckgo-api-key",
        "BFL_API_KEY": "your-bfl-api-key"
      }
    }
  }
}
```

You can generate a debug configuration template using:

```bash
uv run generate_config_template.py
```

## Available Tools

### Command Line Tools

| Tool              | Description                        |
| ----------------- | ---------------------------------- |
| `execute_command` | Execute a command line instruction |

### File Operations Tools

| Tool                 | Description                                         |
| -------------------- | --------------------------------------------------- |
| `read_file_content`  | Read content from a file                            |
| `write_file_content` | Write content to a file                             |
| `replace_in_file`    | Replace content in a file using regular expressions |
| `list_directory`     | List directory contents with detailed information   |

### Figma Tools

| Tool                            | Description                              |
| ------------------------------- | ---------------------------------------- |
| `figma_get_file`                | Get a Figma file by key                  |
| `figma_get_file_nodes`          | Get specific nodes from a Figma file     |
| `figma_get_image`               | Get images for nodes in a Figma file     |
| `figma_get_image_fills`         | Get URLs for images used in a Figma file |
| `figma_get_comments`            | Get comments on a Figma file             |
| `figma_post_comment`            | Post a comment on a Figma file           |
| `figma_delete_comment`          | Delete a comment from a Figma file       |
| `figma_get_team_projects`       | Get projects for a team                  |
| `figma_get_project_files`       | Get files for a project                  |
| `figma_get_team_components`     | Get components for a team                |
| `figma_get_file_components`     | Get components from a file               |
| `figma_get_component`           | Get a component by key                   |
| `figma_get_team_component_sets` | Get component sets for a team            |
| `figma_get_team_styles`         | Get styles for a team                    |
| `figma_get_file_styles`         | Get styles from a file                   |
| `figma_get_style`               | Get a style by key                       |

### XiaoyuZhouFM Tools

| Tool                    | Description                                                                                |
| ----------------------- | ------------------------------------------------------------------------------------------ |
| `xiaoyuzhoufm_download` | Download a podcast episode from XiaoyuZhouFM with optional automatic m4a to mp3 conversion |

### Audio Tools

| Tool               | Description                                                      |
| ------------------ | ---------------------------------------------------------------- |
| `get_audio_length` | Get the length of an audio file in seconds                       |
| `get_audio_text`   | Get transcribed text from a specific time range in an audio file |

### Memory Tools

| Tool             | Description                                                             |
| ---------------- | ----------------------------------------------------------------------- |
| `think`          | Use the tool to think about something and append the thought to the log |
| `get_session_id` | Get the current session ID                                              |
| `remember`       | Store a memory (brief and detail) in the memory database                |
| `recall`         | Query memories from the database with semantic search                   |
| `forget`         | Clear all memories in the memory database                               |

### Markitdown Tools

| Tool                       | Description                                   |
| -------------------------- | --------------------------------------------- |
| `convert_file_to_markdown` | Convert any file to Markdown using MarkItDown |
| `convert_url_to_markdown`  | Convert a URL to Markdown using MarkItDown    |

### Web Tools

| Tool                     | Description                                        |
| ------------------------ | -------------------------------------------------- |
| `get_html`               | Get HTML content from a URL                        |
| `save_html`              | Save HTML from a URL to a file                     |
| `search_with_tavily`     | Search the web using Tavily (requires API key)     |
| `search_with_duckduckgo` | Search the web using DuckDuckGo (requires API key) |

### Flux Image Generation Tools

| Tool                  | Description                                                |
| --------------------- | ---------------------------------------------------------- |
| `flux_generate_image` | Generate an image using the Flux API and save it to a file |

## Usage Examples

### Running the MCP Server

```bash
# Run with stdio transport (default)
mcp-toolbox stdio

# Run with SSE transport
mcp-toolbox sse --host localhost --port 9871
```

### Using with Claude Desktop

1. Configure Claude Desktop as shown in the Configuration section
1. Start Claude Desktop
1. Ask Claude to interact with Figma files:
   - "Can you get information about this Figma file: 12345abcde?"
   - "Show me the components in this Figma file: 12345abcde"
   - "Get the comments from this Figma file: 12345abcde"
1. Ask Claude to execute command line instructions:
   - "What files are in the current directory?"
   - "What's the current system time?"
   - "Show me the contents of a specific file."
1. Ask Claude to download podcasts from XiaoyuZhouFM:
   - "Download this podcast episode: https://www.xiaoyuzhoufm.com/episode/67c3d80fb0167b8db9e3ec0f"
   - "Download and convert to MP3 this podcast: https://www.xiaoyuzhoufm.com/episode/67c3d80fb0167b8db9e3ec0f"
1. Ask Claude to work with audio files:
   - "What's the length of this audio file: audio.m4a?"
   - "Transcribe the audio from 60 to 90 seconds in audio.m4a"
   - "Get the text from 2:30 to 3:00 in the audio file"
1. Ask Claude to convert files or URLs to Markdown:
   - "Convert this file to Markdown: document.docx"
   - "Convert this webpage to Markdown: https://example.com"
1. Ask Claude to work with web content:
   - "Get the HTML content from https://example.com"
   - "Save the HTML from https://example.com to a file"
   - "Search the web for 'artificial intelligence news'"
1. Ask Claude to generate images with Flux:
   - "Generate an image of a beautiful sunset over mountains"
   - "Create an image of a futuristic city and save it to my desktop"
   - "Generate a portrait of a cat in a space suit"
1. Ask Claude to use memory tools:
   - "Remember this important fact: The capital of France is Paris"
   - "What's my current session ID?"
   - "Recall any information about France"
   - "Think about the implications of climate change"
   - "Forget all stored memories"

## Development

### Local Setup

Fork the repository and clone it to your local machine.

```bash
# Install in development mode
make install
# Activate a virtual environment
source .venv/bin/activate  # For macOS/Linux
# or
.venv\Scripts\activate  # For Windows
```

### Running Tests

```bash
make test
```

### Running Checks

```bash
make check
```

### Building Documentation

```bash
make docs
```

## Adding New Tools

To add a new API integration:

1. Update `config.py` with any required API keys
1. Create a new module in `mcp_toolbox/`
1. Implement your API client and tools
1. Add tests for your new functionality
1. Update the README.md with new environment variables and tools

See the [development guide](llms.txt) for more detailed instructions.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

1. Fork the repository
1. Create a feature branch (`git checkout -b feature/amazing-feature`)
1. Commit your changes (`git commit -m 'Add some amazing feature'`)
1. Push to the branch (`git push origin feature/amazing-feature`)
1. Open a Pull Request

## License

This project is licensed under the terms of the license included in the repository.

```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing to `mcp-toolbox`

Contributions are welcome, and they are greatly appreciated!
Every little bit helps, and credit will always be given.

You can contribute in many ways:

# Types of Contributions

## Report Bugs

Report bugs at https://github.com/ai-zerolab/mcp-toolbox/issues

If you are reporting a bug, please include:

- Your operating system name and version.
- Any details about your local setup that might be helpful in troubleshooting.
- Detailed steps to reproduce the bug.

## Fix Bugs

Look through the GitHub issues for bugs.
Anything tagged with "bug" and "help wanted" is open to whoever wants to implement a fix for it.

## Implement Features

Look through the GitHub issues for features.
Anything tagged with "enhancement" and "help wanted" is open to whoever wants to implement it.

## Write Documentation

mcp-toolbox could always use more documentation, whether as part of the official docs, in docstrings, or even on the web in blog posts, articles, and such.

## Submit Feedback

The best way to send feedback is to file an issue at https://github.com/ai-zerolab/mcp-toolbox/issues.

If you are proposing a new feature:

- Explain in detail how it would work.
- Keep the scope as narrow as possible, to make it easier to implement.
- Remember that this is a volunteer-driven project, and that contributions
  are welcome :)

# Get Started!

## Installing locally

Ready to contribute? Here's how to set up `mcp-toolbox` for local development.
Please note this documentation assumes you already have `uv` and `Git` installed and ready to go.

1. Fork the `mcp-toolbox` repo on GitHub.
1. Clone your fork locally:

```bash
cd <directory_in_which_repo_should_be_created>
git clone [email protected]:YOUR_NAME/mcp-toolbox.git
```

3. Now we need to install the environment. Navigate into the directory

```bash
cd mcp-toolbox
```

Then, install and activate the environment with:

```bash
make install
```

4. Install pre-commit to run linters/formatters at commit time:

```bash
uv run pre-commit install
```

5. Create a branch for local development:

```bash
git checkout -b name-of-your-bugfix-or-feature
```

Now you can make your changes locally.

Don't forget to add test cases for your added functionality to the `tests` directory.

## After making your changes

When you're done making changes, check that your changes pass the formatting tests.

```bash
make check
```

Now, validate that all unit tests are passing:

```bash
make test
```

Before raising a pull request you should also run tox. This will run the tests across different versions of Python:

```bash
tox
```

This requires you to have multiple versions of python installed.
This step is also triggered in the CI/CD pipeline, so you could also choose to skip this step locally.

## Commit your changes and push your branch to GitHub:

```bash
git add .
git commit -m "Your detailed description of your changes."
git push origin name-of-your-bugfix-or-feature
```

Submit a pull request through the GitHub website.

# Pull Request Guidelines

Before you submit a pull request, check that it meets these guidelines:

1. The pull request should include tests.
1. If the pull request adds functionality, the docs should be updated.
   Put your new functionality into a function with a docstring, and add the feature to the list in `README.md`.

```

--------------------------------------------------------------------------------
/docs/modules.md:
--------------------------------------------------------------------------------

```markdown

```

--------------------------------------------------------------------------------
/mcp_toolbox/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_toolbox/figma/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_toolbox/markitdown/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_toolbox/web/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/mock/figma/delete_comment.json:
--------------------------------------------------------------------------------

```json
{
  "success": true
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/audio/__init__.py:
--------------------------------------------------------------------------------

```python
"""Audio processing tools."""

```

--------------------------------------------------------------------------------
/mcp_toolbox/flux/__init__.py:
--------------------------------------------------------------------------------

```python
"""Flux API image generation module."""

```

--------------------------------------------------------------------------------
/mcp_toolbox/command_line/__init__.py:
--------------------------------------------------------------------------------

```python
"""Command line tools for MCP-Toolbox."""

```

--------------------------------------------------------------------------------
/pytest.ini:
--------------------------------------------------------------------------------

```
# pytest.ini
[pytest]
asyncio_mode = auto

```

--------------------------------------------------------------------------------
/mcp_toolbox/xiaoyuzhoufm/__init__.py:
--------------------------------------------------------------------------------

```python
"""XiaoyuZhouFM podcast crawler module."""

```

--------------------------------------------------------------------------------
/mcp_toolbox/enhance/__init__.py:
--------------------------------------------------------------------------------

```python
"""LLM Enhancement tools for MCP-Toolbox."""

```

--------------------------------------------------------------------------------
/mcp_toolbox/file_ops/__init__.py:
--------------------------------------------------------------------------------

```python
"""File operations tools for MCP-Toolbox."""

```

--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------

```json
{
  "python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python"
}

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_image.json:
--------------------------------------------------------------------------------

```json
{
  "err": null,
  "images": {
    "2:0": "https://example.com/images/frame1.png",
    "3:0": "https://example.com/images/text1.png"
  }
}

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_image_fills.json:
--------------------------------------------------------------------------------

```json
{
  "meta": {
    "images": {
      "image1": "https://example.com/images/image1.png",
      "image2": "https://example.com/images/image2.png"
    }
  }
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/log.py:
--------------------------------------------------------------------------------

```python
import os

USER_DEFINED_LOG_LEVEL = os.getenv("MCP_TOOLBOX_LOG_LEVEL", "INFO")

os.environ["LOGURU_LEVEL"] = USER_DEFINED_LOG_LEVEL

from loguru import logger  # noqa: E402

__all__ = ["logger"]

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_team_projects.json:
--------------------------------------------------------------------------------

```json
{
  "projects": [
    {
      "created_at": "2023-01-01T00:00:00Z",
      "id": "project1",
      "name": "Project 1"
    },
    {
      "created_at": "2023-01-02T00:00:00Z",
      "id": "project2",
      "name": "Project 2"
    }
  ]
}

```

--------------------------------------------------------------------------------
/codecov.yaml:
--------------------------------------------------------------------------------

```yaml
coverage:
  range: 70..100
  round: down
  precision: 1
  status:
    project:
      default:
        target: 90%
        threshold: 0.5%
    patch:
      default:
        target: auto
        threshold: 0%
        informational: true
codecov:
 token: f927bff4-d404-4986-8c11-624eadda8431

```

--------------------------------------------------------------------------------
/tests/mock/figma/post_comment.json:
--------------------------------------------------------------------------------

```json
{
  "client_meta": {
    "node_id": "2:0",
    "x": 300,
    "y": 400
  },
  "created_at": "2023-01-04T00:00:00Z",
  "id": "comment3",
  "message": "Test comment",
  "order_id": 3,
  "resolved_at": null,
  "user": {
    "handle": "user1",
    "id": "user1",
    "img_url": "https://example.com/user1.png"
  }
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/cli.py:
--------------------------------------------------------------------------------

```python
import typer

from mcp_toolbox.app import mcp

app = typer.Typer()


@app.command()
def stdio():
    mcp.run(transport="stdio")


@app.command()
def sse(
    host: str = "localhost",
    port: int = 9871,
):
    mcp.settings.host = host
    mcp.settings.port = port
    mcp.run(transport="sse")


if __name__ == "__main__":
    app(["stdio"])

```

--------------------------------------------------------------------------------
/.github/workflows/validate-codecov-config.yml:
--------------------------------------------------------------------------------

```yaml
name: validate-codecov-config

on:
  pull_request:
    paths: [codecov.yaml]
  push:
    branches: [main]

jobs:
  validate-codecov-config:
    runs-on: ubuntu-22.04
    steps:
      - uses: actions/checkout@v4
      - name: Validate codecov configuration
        run: curl -sSL --fail-with-body --data-binary @codecov.yaml https://codecov.io/validate

```

--------------------------------------------------------------------------------
/tox.ini:
--------------------------------------------------------------------------------

```
[tox]
skipsdist = true
envlist = py310, py311, py312, py313

[gh-actions]
python =
    3.10: py310
    3.11: py311
    3.12: py312
    3.13: py313

[testenv]
passenv = PYTHON_VERSION
allowlist_externals = uv
commands =
    uv sync --python {envpython}
    uv run python -m pytest --doctest-modules tests --cov --cov-config=pyproject.toml --cov-report=xml

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_project_files.json:
--------------------------------------------------------------------------------

```json
{
  "files": [
    {
      "key": "file1",
      "last_modified": "2023-01-01T00:00:00Z",
      "name": "File 1",
      "thumbnail_url": "https://example.com/thumbnails/file1.png"
    },
    {
      "key": "file2",
      "last_modified": "2023-01-02T00:00:00Z",
      "name": "File 2",
      "thumbnail_url": "https://example.com/thumbnails/file2.png"
    }
  ]
}

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_file_styles.json:
--------------------------------------------------------------------------------

```json
{
  "meta": {
    "styles": {
      "style1": {
        "description": "Primary brand color",
        "key": "style1",
        "name": "Primary Color",
        "remote": false,
        "style_type": "FILL"
      },
      "style2": {
        "description": "Main heading style",
        "key": "style2",
        "name": "Heading 1",
        "remote": false,
        "style_type": "TEXT"
      }
    }
  }
}

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Install uv
FROM python:3.12-slim

# Install tini
RUN apt-get update && \
    apt-get install -y --no-install-recommends tini && \
    rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

# Change the working directory to the `app` directory
WORKDIR /app

# Copy the lockfile and `pyproject.toml` into the image
COPY uv.lock /app/uv.lock
COPY pyproject.toml /app/pyproject.toml

# Install dependencies
RUN uv sync --frozen --no-install-project

# Copy the project into the image
COPY . /app

# Sync the project
RUN uv sync --frozen

# Run the server
ENTRYPOINT ["tini", "--", "uv", "run", "mcp-toolbox"]
CMD ["stdio"]

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_style.json:
--------------------------------------------------------------------------------

```json
{
  "containing_file": {
    "key": "file1",
    "name": "UI Styles"
  },
  "created_at": "2023-01-01T00:00:00Z",
  "description": "Primary brand color",
  "key": "style1",
  "name": "Primary Color",
  "sort_position": "1",
  "style_properties": {
    "fills": [
      {
        "color": {
          "a": 1,
          "b": 0.8,
          "g": 0.4,
          "r": 0.2
        },
        "type": "SOLID"
      }
    ]
  },
  "style_type": "FILL",
  "thumbnail_url": "https://example.com/thumbnails/style1.png",
  "updated_at": "2023-01-02T00:00:00Z",
  "user": {
    "handle": "user1",
    "id": "user1",
    "img_url": "https://example.com/user1.png"
  }
}

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required: []
    properties:
      figmaApiKey:
        type: string
        default: ""
        description: Optional API key for Figma integration.
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({ command: 'uv', args: ['run', '--prerelease=allow', 'mcp-toolbox@latest', 'stdio'], env: { FIGMA_API_KEY: config.figmaApiKey } })
  exampleConfig:
    figmaApiKey: your-figma-api-key

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_file_components.json:
--------------------------------------------------------------------------------

```json
{
  "meta": {
    "components": {
      "component1": {
        "containing_frame": {
          "name": "Components",
          "node_id": "4:0",
          "page_id": "1:0",
          "page_name": "Page 1"
        },
        "description": "Standard button component",
        "key": "component1",
        "name": "Button",
        "remote": false
      },
      "component2": {
        "containing_frame": {
          "name": "Components",
          "node_id": "4:0",
          "page_id": "1:0",
          "page_name": "Page 1"
        },
        "description": "Standard input field component",
        "key": "component2",
        "name": "Input Field",
        "remote": false
      }
    }
  }
}

```

--------------------------------------------------------------------------------
/.github/actions/setup-python-env/action.yml:
--------------------------------------------------------------------------------

```yaml
name: "Setup Python Environment"
description: "Set up Python environment for the given Python version"

inputs:
  python-version:
    description: "Python version to use"
    required: true
    default: "3.12"
  uv-version:
    description: "uv version to use"
    required: true
    default: "0.6.2"

runs:
  using: "composite"
  steps:
    - uses: actions/setup-python@v5
      with:
        python-version: ${{ inputs.python-version }}

    - name: Install uv
      uses: astral-sh/setup-uv@v2
      with:
        version: ${{ inputs.uv-version }}
        enable-cache: 'true'
        cache-suffix: ${{ matrix.python-version }}

    - name: Install Python dependencies
      run: uv sync --frozen
      shell: bash

```

--------------------------------------------------------------------------------
/docs/index.md:
--------------------------------------------------------------------------------

```markdown
# mcp-toolbox

[![Release](https://img.shields.io/github/v/release/ai-zerolab/mcp-toolbox)](https://img.shields.io/github/v/release/ai-zerolab/mcp-toolbox)
[![Build status](https://img.shields.io/github/actions/workflow/status/ai-zerolab/mcp-toolbox/main.yml?branch=main)](https://github.com/ai-zerolab/mcp-toolbox/actions/workflows/main.yml?query=branch%3Amain)
[![Commit activity](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-toolbox)](https://img.shields.io/github/commit-activity/m/ai-zerolab/mcp-toolbox)
[![License](https://img.shields.io/github/license/ai-zerolab/mcp-toolbox)](https://img.shields.io/github/license/ai-zerolab/mcp-toolbox)

Maintenance of a set of tools to enhance LLM through MCP protocols.

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_component.json:
--------------------------------------------------------------------------------

```json
{
  "component_property_definitions": {
    "size": {
      "defaultValue": "medium",
      "type": "VARIANT"
    },
    "variant": {
      "defaultValue": "primary",
      "type": "VARIANT"
    }
  },
  "component_set_id": "component_set1",
  "containing_file": {
    "key": "file1",
    "name": "UI Components"
  },
  "containing_frame": {
    "name": "Components",
    "node_id": "4:0",
    "page_id": "1:0",
    "page_name": "Page 1"
  },
  "created_at": "2023-01-01T00:00:00Z",
  "description": "Standard button component",
  "key": "component1",
  "name": "Button",
  "thumbnail_url": "https://example.com/thumbnails/component1.png",
  "updated_at": "2023-01-02T00:00:00Z",
  "user": {
    "handle": "user1",
    "id": "user1",
    "img_url": "https://example.com/user1.png"
  }
}

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_comments.json:
--------------------------------------------------------------------------------

```json
{
  "comments": [
    {
      "client_meta": {
        "node_id": "2:0",
        "x": 100,
        "y": 200
      },
      "created_at": "2023-01-01T00:00:00Z",
      "id": "comment1",
      "message": "This is a comment",
      "order_id": 1,
      "resolved_at": null,
      "user": {
        "handle": "user1",
        "id": "user1",
        "img_url": "https://example.com/user1.png"
      }
    },
    {
      "client_meta": {
        "node_id": "3:0",
        "x": 150,
        "y": 250
      },
      "created_at": "2023-01-02T00:00:00Z",
      "id": "comment2",
      "message": "Another comment",
      "order_id": 2,
      "resolved_at": "2023-01-03T00:00:00Z",
      "user": {
        "handle": "user2",
        "id": "user2",
        "img_url": "https://example.com/user2.png"
      }
    }
  ]
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/app.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP

from mcp_toolbox.config import Config
from mcp_toolbox.log import logger

mcp = FastMCP("mcp-toolbox")
config = Config()


# Import tools to register them with the MCP server
if config.enable_commond_tools:
    import mcp_toolbox.command_line.tools
if config.enable_file_ops_tools:
    import mcp_toolbox.file_ops.tools
if config.enable_audio_tools:
    try:
        import mcp_toolbox.audio.tools
    except ImportError:
        logger.error(
            "Audio tools is not available. Please install the required dependencies. e.g. `pip install mcp-toolbox[audio]`"
        )
if config.enabel_enhance_tools:
    import mcp_toolbox.enhance.tools
if config.figma_api_key:
    import mcp_toolbox.figma.tools
if config.bfl_api_key:
    import mcp_toolbox.flux.tools
import mcp_toolbox.markitdown.tools  # noqa: E402
import mcp_toolbox.web.tools  # noqa: E402
import mcp_toolbox.xiaoyuzhoufm.tools  # noqa: E402, F401

# TODO: Add prompt for toolbox's tools

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_team_styles.json:
--------------------------------------------------------------------------------

```json
{
  "styles": [
    {
      "containing_file": {
        "key": "file1",
        "name": "UI Styles"
      },
      "created_at": "2023-01-01T00:00:00Z",
      "description": "Primary brand color",
      "key": "style1",
      "name": "Primary Color",
      "sort_position": "1",
      "style_type": "FILL",
      "thumbnail_url": "https://example.com/thumbnails/style1.png",
      "updated_at": "2023-01-02T00:00:00Z",
      "user": {
        "handle": "user1",
        "id": "user1",
        "img_url": "https://example.com/user1.png"
      }
    },
    {
      "containing_file": {
        "key": "file1",
        "name": "UI Styles"
      },
      "created_at": "2023-01-03T00:00:00Z",
      "description": "Main heading style",
      "key": "style2",
      "name": "Heading 1",
      "sort_position": "2",
      "style_type": "TEXT",
      "thumbnail_url": "https://example.com/thumbnails/style2.png",
      "updated_at": "2023-01-04T00:00:00Z",
      "user": {
        "handle": "user2",
        "id": "user2",
        "img_url": "https://example.com/user2.png"
      }
    }
  ]
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/config.py:
--------------------------------------------------------------------------------

```python
import platform
from pathlib import Path

from pydantic_settings import BaseSettings


class Config(BaseSettings):
    figma_api_key: str | None = None
    tavily_api_key: str | None = None
    duckduckgo_api_key: str | None = None
    bfl_api_key: str | None = None

    enable_commond_tools: bool = True
    enable_file_ops_tools: bool = True
    enable_audio_tools: bool = True
    enabel_enhance_tools: bool = True
    tool_home: str = Path("~/.zerolab/mcp-toolbox").expanduser().as_posix()

    @property
    def cache_dir(self) -> str:
        return (Path(self.tool_home) / "cache").expanduser().resolve().absolute().as_posix()

    @property
    def memory_file(self) -> str:
        # Use Documents folder for macOS to enable sync across multiple Mac devices
        if platform.system() == "Darwin":  # macOS
            documents_path = Path("~/Documents/zerolab/mcp-toolbox").expanduser()
            documents_path.mkdir(parents=True, exist_ok=True)
            return (documents_path / "memory").resolve().absolute().as_posix()
        else:
            # Default behavior for other operating systems
            return (Path(self.tool_home) / "memory").expanduser().resolve().absolute().as_posix()


if __name__ == "__main__":
    print(Config())

```

--------------------------------------------------------------------------------
/generate_config_template.py:
--------------------------------------------------------------------------------

```python
import json
import shutil
import sys
from pathlib import Path

from mcp_toolbox.config import Config


def get_endpoint_path() -> str:
    """
    Find the path to the mcp-toolbox script.
    Similar to the 'which' command in Unix-like systems.

    Returns:
        str: The full path to the mcp-toolbox script
    """
    # First try using shutil.which to find the script in PATH
    script_path = shutil.which("mcp-toolbox")
    if script_path:
        return script_path

    # If not found in PATH, try to find it in the current Python environment
    # This handles cases where the script is installed but not in PATH
    bin_dir = Path(sys.executable).parent
    possible_paths = [
        bin_dir / "mcp-toolbox",
        bin_dir / "mcp-toolbox.exe",  # For Windows
    ]

    for path in possible_paths:
        if path.exists():
            return str(path)

    # If we can't find it, return the script name and hope it's in PATH when executed
    return "mcp-toolbox"


if __name__ == "__main__":
    endpoint_path = get_endpoint_path()

    mcp_config = {
        "command": endpoint_path,
        "args": ["stdio"],
        "env": {field.upper(): "" for field in Config.model_fields},
    }

    mcp_item = {
        "zerolab-toolbox-dev": mcp_config,
    }

    print(json.dumps(mcp_item, indent=4))

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_team_components.json:
--------------------------------------------------------------------------------

```json
{
  "components": [
    {
      "containing_file": {
        "key": "file1",
        "name": "UI Components"
      },
      "containing_frame": {
        "name": "Components",
        "node_id": "4:0",
        "page_id": "1:0",
        "page_name": "Page 1"
      },
      "created_at": "2023-01-01T00:00:00Z",
      "description": "Standard button component",
      "key": "component1",
      "name": "Button",
      "thumbnail_url": "https://example.com/thumbnails/component1.png",
      "updated_at": "2023-01-02T00:00:00Z",
      "user": {
        "handle": "user1",
        "id": "user1",
        "img_url": "https://example.com/user1.png"
      }
    },
    {
      "containing_file": {
        "key": "file1",
        "name": "UI Components"
      },
      "containing_frame": {
        "name": "Components",
        "node_id": "4:0",
        "page_id": "1:0",
        "page_name": "Page 1"
      },
      "created_at": "2023-01-03T00:00:00Z",
      "description": "Standard input field component",
      "key": "component2",
      "name": "Input Field",
      "thumbnail_url": "https://example.com/thumbnails/component2.png",
      "updated_at": "2023-01-04T00:00:00Z",
      "user": {
        "handle": "user2",
        "id": "user2",
        "img_url": "https://example.com/user2.png"
      }
    }
  ]
}

```

--------------------------------------------------------------------------------
/mkdocs.yml:
--------------------------------------------------------------------------------

```yaml
site_name: mcp-toolbox
repo_url: https://github.com/ai-zerolab/mcp-toolbox
site_url: https://ai-zerolab.github.io/mcp-toolbox
site_description: Maintenance of a set of tools to enhance LLM through MCP protocols.
site_author: ai-zerolab
edit_uri: edit/main/docs/
repo_name: ai-zerolab/mcp-toolbox
copyright: Maintained by <a href="https://ai-zerolab.com">ai-zerolab</a>.

nav:
  - Home: index.md
  - Modules: modules.md
plugins:
  - search
  - mkdocstrings:
      handlers:
        python:
          paths: ["mcp_toolbox"]
theme:
  name: material
  feature:
    tabs: true
  palette:
    - media: "(prefers-color-scheme: light)"
      scheme: default
      primary: white
      accent: deep orange
      toggle:
        icon: material/brightness-7
        name: Switch to dark mode
    - media: "(prefers-color-scheme: dark)"
      scheme: slate
      primary: black
      accent: deep orange
      toggle:
        icon: material/brightness-4
        name: Switch to light mode
  icon:
    repo: fontawesome/brands/github

extra:
  social:
    - icon: fontawesome/brands/github
      link: https://github.com/ai-zerolab/mcp-toolbox
    - icon: fontawesome/brands/python
      link: https://pypi.org/project/mcp-toolbox

markdown_extensions:
  - toc:
      permalink: true
  - pymdownx.arithmatex:
      generic: true

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_team_component_sets.json:
--------------------------------------------------------------------------------

```json
{
  "component_sets": [
    {
      "containing_file": {
        "key": "file1",
        "name": "UI Components"
      },
      "containing_frame": {
        "name": "Components",
        "node_id": "4:0",
        "page_id": "1:0",
        "page_name": "Page 1"
      },
      "created_at": "2023-01-01T00:00:00Z",
      "description": "Button component set with variants",
      "key": "component_set1",
      "name": "Button",
      "thumbnail_url": "https://example.com/thumbnails/component_set1.png",
      "updated_at": "2023-01-02T00:00:00Z",
      "user": {
        "handle": "user1",
        "id": "user1",
        "img_url": "https://example.com/user1.png"
      }
    },
    {
      "containing_file": {
        "key": "file1",
        "name": "UI Components"
      },
      "containing_frame": {
        "name": "Components",
        "node_id": "4:0",
        "page_id": "1:0",
        "page_name": "Page 1"
      },
      "created_at": "2023-01-03T00:00:00Z",
      "description": "Input field component set with variants",
      "key": "component_set2",
      "name": "Input Field",
      "thumbnail_url": "https://example.com/thumbnails/component_set2.png",
      "updated_at": "2023-01-04T00:00:00Z",
      "user": {
        "handle": "user2",
        "id": "user2",
        "img_url": "https://example.com/user2.png"
      }
    }
  ]
}

```

--------------------------------------------------------------------------------
/.github/workflows/main.yml:
--------------------------------------------------------------------------------

```yaml
name: Main

on:
  push:
    branches:
      - main
  pull_request:
    types: [opened, synchronize, reopened, ready_for_review]

jobs:
  quality:
    runs-on: ubuntu-latest
    steps:
      - name: Check out
        uses: actions/checkout@v4

      - uses: actions/cache@v4
        with:
          path: ~/.cache/pre-commit
          key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }}

      - name: Set up the environment
        uses: ./.github/actions/setup-python-env

      - name: Run checks
        run: make check

  tests-and-type-check:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.10", "3.11", "3.12", "3.13"]
      fail-fast: false
    defaults:
      run:
        shell: bash
    steps:
      - name: Check out
        uses: actions/checkout@v4

      - name: Set up the environment
        uses: ./.github/actions/setup-python-env
        with:
          python-version: ${{ matrix.python-version }}

      - name: Run tests
        run: uv run python -m pytest tests --cov --cov-config=pyproject.toml --cov-report=xml

      - name: Upload coverage reports to Codecov with GitHub Action on Python 3.11
        uses: codecov/codecov-action@v4
        if: ${{ matrix.python-version == '3.11' }}

  check-docs:
    runs-on: ubuntu-latest
    steps:
      - name: Check out
        uses: actions/checkout@v4

      - name: Set up the environment
        uses: ./.github/actions/setup-python-env

      - name: Check if documentation can be built
        run: uv run mkdocs build -s

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_file.json:
--------------------------------------------------------------------------------

```json
{
  "components": {},
  "document": {
    "children": [
      {
        "children": [
          {
            "absoluteBoundingBox": {
              "height": 100,
              "width": 100,
              "x": 0,
              "y": 0
            },
            "background": [
              {
                "blendMode": "NORMAL",
                "color": {
                  "a": 1,
                  "b": 1,
                  "g": 1,
                  "r": 1
                },
                "type": "SOLID"
              }
            ],
            "backgroundColor": {
              "a": 1,
              "b": 1,
              "g": 1,
              "r": 1
            },
            "blendMode": "PASS_THROUGH",
            "children": [],
            "clipsContent": true,
            "constraints": {
              "horizontal": "LEFT",
              "vertical": "TOP"
            },
            "effects": [],
            "fills": [
              {
                "blendMode": "NORMAL",
                "color": {
                  "a": 1,
                  "b": 1,
                  "g": 1,
                  "r": 1
                },
                "type": "SOLID"
              }
            ],
            "id": "2:0",
            "name": "Frame 1",
            "strokeAlign": "INSIDE",
            "strokeWeight": 1,
            "strokes": [],
            "type": "FRAME"
          }
        ],
        "id": "1:0",
        "name": "Page 1",
        "type": "CANVAS"
      }
    ],
    "id": "0:0",
    "name": "Document",
    "type": "DOCUMENT"
  },
  "editorType": "figma",
  "lastModified": "2023-01-01T00:00:00Z",
  "name": "Test File",
  "role": "owner",
  "schemaVersion": 0,
  "styles": {},
  "thumbnailUrl": "https://example.com/thumbnail.png",
  "version": "123"
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/markitdown/tools.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
from typing import Annotated, Any

from markitdown import MarkItDown
from pydantic import Field

from mcp_toolbox.app import mcp

md = MarkItDown(enable_builtins=True, enable_plugins=True)


@mcp.tool(
    description="Convert any file to Markdown, using MarkItDown.",
)
async def convert_file_to_markdown(
    input_file: Annotated[str, Field(description="The input Markdown file")],
    output_file: Annotated[str, Field(description="The output HTML file")],
) -> dict[str, Any]:
    """Convert any file to Markdown

    Args:
        input_file: The input Markdown file
        output_file: The output HTML file
    """
    input_file: Path = Path(input_file).expanduser().resolve().absolute()
    output_file: Path = Path(output_file).expanduser().resolve().absolute()

    if not input_file.is_file():
        return {
            "error": f"Input file not found: {input_file.as_posix()}",
            "success": False,
        }

    output_file.parent.mkdir(parents=True, exist_ok=True)

    c = md.convert(input_file.as_posix()).text_content
    output_file.write_text(c)

    return {
        "success": True,
        "input_file": input_file.as_posix(),
        "output_file": output_file.as_posix(),
    }


@mcp.tool(
    description="Convert a URL to Markdown, using MarkItDown.",
)
async def convert_url_to_markdown(
    url: Annotated[str, Field(description="The URL to convert")],
    output_file: Annotated[str, Field(description="The output Markdown file")],
) -> dict[str, Any]:
    """Convert a URL to Markdown

    Args:
        url: The URL to convert
        output_file: The output Markdown file"
    """
    output_file: Path = Path(output_file).expanduser().resolve().absolute()

    output_file.parent.mkdir(parents=True, exist_ok=True)

    c = md.convert_url(url).text_content
    output_file.write_text(c)

    return {
        "success": True,
        "url": url,
        "output_file": output_file.as_posix(),
    }

```

--------------------------------------------------------------------------------
/tests/mock/figma/get_file_nodes.json:
--------------------------------------------------------------------------------

```json
{
  "nodes": {
    "2:0": {
      "document": {
        "absoluteBoundingBox": {
          "height": 100,
          "width": 100,
          "x": 0,
          "y": 0
        },
        "background": [
          {
            "blendMode": "NORMAL",
            "color": {
              "a": 1,
              "b": 1,
              "g": 1,
              "r": 1
            },
            "type": "SOLID"
          }
        ],
        "backgroundColor": {
          "a": 1,
          "b": 1,
          "g": 1,
          "r": 1
        },
        "blendMode": "PASS_THROUGH",
        "children": [],
        "clipsContent": true,
        "constraints": {
          "horizontal": "LEFT",
          "vertical": "TOP"
        },
        "effects": [],
        "fills": [
          {
            "blendMode": "NORMAL",
            "color": {
              "a": 1,
              "b": 1,
              "g": 1,
              "r": 1
            },
            "type": "SOLID"
          }
        ],
        "id": "2:0",
        "name": "Frame 1",
        "strokeAlign": "INSIDE",
        "strokeWeight": 1,
        "strokes": [],
        "type": "FRAME"
      }
    },
    "3:0": {
      "document": {
        "absoluteBoundingBox": {
          "height": 20,
          "width": 80,
          "x": 10,
          "y": 10
        },
        "blendMode": "PASS_THROUGH",
        "characters": "Hello World",
        "constraints": {
          "horizontal": "LEFT",
          "vertical": "TOP"
        },
        "effects": [],
        "fills": [
          {
            "blendMode": "NORMAL",
            "color": {
              "a": 1,
              "b": 0,
              "g": 0,
              "r": 0
            },
            "type": "SOLID"
          }
        ],
        "id": "3:0",
        "name": "Text Layer",
        "strokeAlign": "INSIDE",
        "strokeWeight": 1,
        "strokes": [],
        "style": {
          "fontFamily": "Roboto",
          "fontPostScriptName": "Roboto-Regular",
          "fontSize": 14,
          "fontWeight": 400,
          "letterSpacing": 0,
          "lineHeightPercent": 100,
          "lineHeightPx": 16.4,
          "textAlignHorizontal": "LEFT",
          "textAlignVertical": "TOP"
        },
        "type": "TEXT"
      }
    }
  }
}

```

--------------------------------------------------------------------------------
/mcp_toolbox/command_line/tools.py:
--------------------------------------------------------------------------------

```python
"""Command line execution tools for MCP-Toolbox."""

import asyncio
import contextlib
import os
from pathlib import Path
from typing import Annotated, Any

from pydantic import Field

from mcp_toolbox.app import mcp


@mcp.tool(description="Execute a command line instruction.")
async def execute_command(
    command: Annotated[list[str], Field(description="The command to execute as a list of strings")],
    timeout_seconds: Annotated[int, Field(default=30, description="Maximum execution time in seconds")] = 30,
    working_dir: Annotated[str | None, Field(default=None, description="Directory to execute the command in")] = None,
) -> dict[str, Any]:
    """Execute a command line instruction."""
    if not command:
        return {
            "error": "Command cannot be empty",
            "stdout": "",
            "stderr": "",
            "return_code": 1,
        }

    try:
        # Expand user home directory in working_dir if provided
        expanded_working_dir = Path(working_dir).expanduser() if working_dir else working_dir

        # Create subprocess with current environment
        process = await asyncio.create_subprocess_exec(
            *command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=os.environ,
            cwd=expanded_working_dir,
        )

        try:
            # Wait for the process with timeout
            stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout_seconds)

            # Decode output
            stdout_str = stdout.decode("utf-8", errors="replace") if stdout else ""
            stderr_str = stderr.decode("utf-8", errors="replace") if stderr else ""

            return {
                "stdout": stdout_str,
                "stderr": stderr_str,
                "return_code": process.returncode,
            }

        except asyncio.TimeoutError:
            # Kill the process if it times out
            with contextlib.suppress(ProcessLookupError):
                process.kill()

            return {
                "error": f"Command execution timed out after {timeout_seconds} seconds",
                "stdout": "",
                "stderr": "",
                "return_code": 124,  # Standard timeout return code
            }

    except Exception as e:
        return {
            "error": f"Failed to execute command: {e!s}",
            "stdout": "",
            "stderr": "",
            "return_code": 1,
        }

```

--------------------------------------------------------------------------------
/.github/workflows/on-release-main.yml:
--------------------------------------------------------------------------------

```yaml
name: release-main

permissions:
  contents: write
  packages: write

on:
  release:
    types: [published]

jobs:
  set-version:
    runs-on: ubuntu-24.04
    steps:
      - uses: actions/checkout@v4

      - name: Export tag
        id: vars
        run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT
        if: ${{ github.event_name == 'release' }}

      - name: Update project version
        run: |
          sed -i "s/^version = \".*\"/version = \"$RELEASE_VERSION\"/" pyproject.toml
        env:
          RELEASE_VERSION: ${{ steps.vars.outputs.tag }}
        if: ${{ github.event_name == 'release' }}

      - name: Upload updated pyproject.toml
        uses: actions/upload-artifact@v4
        with:
          name: pyproject-toml
          path: pyproject.toml

  publish:
    runs-on: ubuntu-latest
    needs: [set-version]
    steps:
      - name: Check out
        uses: actions/checkout@v4

      - name: Set up the environment
        uses: ./.github/actions/setup-python-env

      - name: Download updated pyproject.toml
        uses: actions/download-artifact@v4
        with:
          name: pyproject-toml

      - name: Build package
        run: uv build

      - name: Publish package
        run: uv publish
        env:
          UV_PUBLISH_TOKEN: ${{ secrets.PYPI_TOKEN }}

      - name: Upload dists to release
        uses: svenstaro/upload-release-action@v2
        with:
            repo_token: ${{ secrets.GITHUB_TOKEN }}
            file: dist/*
            file_glob: true
            tag: ${{ github.ref }}
            overwrite: true


  push-image:
    runs-on: ubuntu-latest
    needs: [set-version]
    steps:
      - uses: actions/checkout@v4
      - name: Export tag
        id: vars
        run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT
        if: ${{ github.event_name == 'release' }}
      - name: Set up QEMU
        uses: docker/setup-qemu-action@v3
      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3
      - name: Login to Github Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ai-zerolab
          password: ${{ secrets.GITHUB_TOKEN }}
      - name: Build and push image
        id: docker_build_publish
        uses: docker/build-push-action@v5
        with:
            context: .
            platforms: linux/amd64,linux/arm64/v8
            cache-from: type=gha
            cache-to: type=gha,mode=max
            file: ./Dockerfile
            push: true
            tags: |
              ghcr.io/ai-zerolab/mcp-toolbox:${{ steps.vars.outputs.tag }}
              ghcr.io/ai-zerolab/mcp-toolbox:latest

  deploy-docs:
    needs: publish
    runs-on: ubuntu-latest
    steps:
      - name: Check out
        uses: actions/checkout@v4

      - name: Set up the environment
        uses: ./.github/actions/setup-python-env

      - name: Deploy documentation
        run: uv run mkdocs gh-deploy --force

```

--------------------------------------------------------------------------------
/mcp_toolbox/enhance/tools.py:
--------------------------------------------------------------------------------

```python
from typing import Annotated

from pydantic import Field

from mcp_toolbox.app import mcp
from mcp_toolbox.log import logger


@mcp.tool(
    description="Use the tool to think about something. It will not obtain new information or change the database, but just append the thought to the log. Use it when complex reasoning or some cache memory is needed."
)
async def think(
    thought: Annotated[str, Field(description="A thought to think about.")],
) -> dict[str, str]:
    """
    see: https://www.anthropic.com/engineering/claude-think-tool
    """

    return {
        "thought": thought,
    }


try:
    from mcp_toolbox.enhance.memory import LocalMemory, get_current_session_memory
except ImportError:
    logger.error(
        "Memory tools are not available. Please install the required dependencies. e.g. `pip install mcp-toolbox[enhance]`"
    )
else:

    @mcp.tool(description="Get the current session id.")
    def get_session_id() -> dict[str, str]:
        memory: LocalMemory = get_current_session_memory()
        return {"session_id": memory.session_id}

    @mcp.tool(description="Store a memory in the memory database.")
    def remember(
        brief: Annotated[str, Field(description="The brief information of the memory.")],
        detail: Annotated[str, Field(description="The detailed information of the brief text.")],
    ) -> dict[str, str]:
        memory: LocalMemory = get_current_session_memory()
        memory.store(brief, detail)
        return {
            "session_id": memory.session_id,
            "brief": brief,
            "detail": detail,
        }

    @mcp.tool(description="Query a memory from the memory database.")
    def recall(
        query: Annotated[str, Field(description="The query to search in the memory database.")],
        top_k: Annotated[
            int,
            Field(
                description="The maximum number of results to return. Default to 5.",
                default=5,
            ),
        ] = 5,
        cross_session: Annotated[
            bool,
            Field(
                description="Whether to search across all sessions. Default to True.",
                default=True,
            ),
        ] = True,
        session_id: Annotated[
            str | None,
            Field(
                description="The session id of the memory. If not provided, the current session id will be used.",
                default=None,
            ),
        ] = None,
    ) -> list[dict[str, str]]:
        if session_id:
            memory = LocalMemory.use_session(session_id)
        else:
            memory: LocalMemory = get_current_session_memory()
        results = memory.query(query, top_k=top_k, cross_session=cross_session)
        return [r.model_dump(exclude_none=True) for r in results]

    @mcp.tool(description="Clear all memories in the memory database.")
    def forget() -> dict[str, str]:
        memory: LocalMemory = get_current_session_memory()
        memory.clear()
        return {"message": "All memories are cleared."}

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-toolbox"
version = "0.0.0.dev"
description = "Maintenance of a set of tools to enhance LLM through MCP protocols."
authors = [{ name = "ai-zerolab", email = "[email protected]" }]
readme = "README.md"
keywords = ['MCP', "Model Context Protocol", "LLM"]
requires-python = ">=3.10"
classifiers = [
    "Intended Audience :: Developers",
    "Programming Language :: Python",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
    "anyio>=4.8.0",
    "duckduckgo-search>=7.5.2",
    "httpx>=0.28.1",
    "loguru>=0.7.3",
    "markitdown[all]~=0.1.0a1",
    "mcp[cli]>=1.3.0",
    "numpy>=2.1.3",
    "pillow>=11.1.0",
    "pydantic>=2.10.6",
    "pydantic-settings[toml]>=2.8.0",
    "tavily-python>=0.5.1",
    "typer>=0.15.2",
]

[project.urls]
Homepage = "https://ai-zerolab.github.io/mcp-toolbox/"
Repository = "https://github.com/ai-zerolab/mcp-toolbox"
Documentation = "https://ai-zerolab.github.io/mcp-toolbox/"

[dependency-groups]

dev = [
    "pytest>=7.2.0",
    "pre-commit>=2.20.0",
    "tox-uv>=1.11.3",
    "deptry>=0.22.0",
    "pytest-cov>=4.0.0",
    "ruff>=0.9.2",
    "mkdocs>=1.4.2",
    "mkdocs-material>=8.5.10",
    "mkdocstrings[python]>=0.26.1",
    "pytest-asyncio>=0.25.3",
    "mcp-toolbox[all]",
]
[project.optional-dependencies]
audio = ["openai-whisper>=20240930 ; python_version <= '3.12'"]
memory = ["fastembed>=0.6.0", "portalocker>=3.1.1"]
all = ["mcp-toolbox[audio, memory]"]

[project.scripts]
mcp-toolbox = "mcp_toolbox.cli:app"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.setuptools]
py-modules = ["mcp_toolbox"]


[tool.pytest.ini_options]
testpaths = ["tests"]

[tool.ruff]
target-version = "py310"
line-length = 120
fix = true

[tool.ruff.lint]
select = [
    # flake8-2020
    "YTT",
    # flake8-bandit
    "S",
    # flake8-bugbear
    "B",
    # flake8-builtins
    "A",
    # flake8-comprehensions
    "C4",
    # flake8-debugger
    "T10",
    # flake8-simplify
    "SIM",
    # isort
    "I",
    # mccabe
    "C90",
    # pycodestyle
    "E",
    "W",
    # pyflakes
    "F",
    # pygrep-hooks
    "PGH",
    # pyupgrade
    "UP",
    # ruff
    "RUF",
    # tryceratops
    "TRY",
]
ignore = [
    # LineTooLong
    "E501",
    # DoNotAssignLambda
    "E731",
    # raise-vanilla-args
    "TRY003",
    # try-consider-else
    "TRY300",
    # raise-within-try
    "TRY301",
]

[tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101", "C901", "F841", "S108", "F821"]
"mcp_toolbox/flux/api.py" = ["C901", "SIM102"]

[tool.ruff.format]
preview = true

[tool.coverage.report]
skip_empty = true

[tool.coverage.run]
branch = true
source = ["mcp_toolbox"]
omit = ["mcp_toolbox/flux/api.py"]


[tool.deptry]
exclude = ["mcp_toolbox/app.py", ".venv", "tests"]

[tool.deptry.per_rule_ignores]
DEP002 = ["mcp", "mcp-toolbox"]

```

--------------------------------------------------------------------------------
/tests/audio/test_audio_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for audio tools."""

from unittest.mock import MagicMock, patch

import pytest

try:
    from mcp_toolbox.audio.tools import get_audio_length, get_audio_text
except ImportError:
    pytest.skip("Audio tools are not available.", allow_module_level=True)


@pytest.fixture
def mock_whisper():
    """Mock whisper module."""
    with patch("mcp_toolbox.audio.tools.whisper") as mock_whisper:
        # Mock load_audio to return a numpy array of a specific length
        mock_audio = MagicMock()
        mock_audio.__len__.return_value = 16000 * 60  # 60 seconds of audio at 16kHz
        mock_whisper.load_audio.return_value = mock_audio

        # Mock the model
        mock_model = MagicMock()
        mock_model.detect_language.return_value = (None, {"en": 0.9, "zh": 0.1})
        mock_model.transcribe.return_value = {"text": "Successfully transcribed audio"}
        mock_whisper.load_model.return_value = mock_model

        yield mock_whisper


@pytest.fixture
def mock_os_path_exists():
    """Mock os.path.exists to return True."""
    with patch("os.path.exists", return_value=True):
        yield


@pytest.mark.asyncio
async def test_get_audio_length(mock_whisper, mock_os_path_exists):
    """Test get_audio_length function."""
    result = await get_audio_length("test.m4a")

    # Check that the function returns the expected values
    assert "duration_seconds" in result
    assert "formatted_duration" in result
    assert "message" in result
    assert result["duration_seconds"] == 60.0
    assert result["formatted_duration"] == "0:01:00"
    assert "60.00 seconds" in result["message"]

    # Check that whisper.load_audio was called with the correct arguments
    mock_whisper.load_audio.assert_called_once_with("test.m4a")


@pytest.mark.asyncio
async def test_get_audio_length_file_not_found():
    """Test get_audio_length function with a non-existent file."""
    with patch("os.path.exists", return_value=False):
        result = await get_audio_length("nonexistent.m4a")

    # Check that the function returns an error
    assert "error" in result
    assert "message" in result
    assert "not found" in result["error"]


@pytest.mark.asyncio
async def test_get_audio_text(mock_whisper, mock_os_path_exists):
    """Test get_audio_text function."""
    # Set up global variables in the module
    with patch("mcp_toolbox.audio.tools._detected_language", "en"):
        result = await get_audio_text("test.m4a", 10.0, 20.0, "base")

    # Check that the function returns the expected values
    assert "text" in result
    assert "start_time" in result
    assert "end_time" in result
    assert "time_range" in result
    assert "language" in result
    assert "message" in result
    assert result["text"] == "Successfully transcribed audio"
    assert result["start_time"] == 10.0
    assert result["end_time"] == 20.0
    assert result["time_range"] == "0:00:10 - 0:00:20"
    assert "Successfully transcribed audio" in result["message"]

    # Check that whisper.load_model and transcribe were called
    mock_whisper.load_model.assert_called()
    mock_whisper.load_model().transcribe.assert_called()


@pytest.mark.asyncio
async def test_get_audio_text_file_not_found():
    """Test get_audio_text function with a non-existent file."""
    with patch("os.path.exists", return_value=False):
        result = await get_audio_text("nonexistent.m4a", 10.0, 20.0)

    # Check that the function returns an error
    assert "error" in result
    assert "message" in result
    assert "not found" in result["error"]

```

--------------------------------------------------------------------------------
/mcp_toolbox/flux/tools.py:
--------------------------------------------------------------------------------

```python
"""Flux API image generation tools."""

from pathlib import Path
from typing import Annotated, Any

from loguru import logger
from pydantic import Field

from mcp_toolbox.app import mcp
from mcp_toolbox.config import Config
from mcp_toolbox.flux.api import ApiException, ImageRequest


@mcp.tool(description="Generate an image using the Flux API and save it to a local file.")
async def flux_generate_image(
    prompt: Annotated[str, Field(description="The text prompt for image generation")],
    output_dir: Annotated[str, Field(description="The directory to save the image")],
    model_name: Annotated[str, Field(default="flux.1.1-pro", description="The model version to use")] = "flux.1.1-pro",
    width: Annotated[int | None, Field(default=None, description="Width of the image in pixels")] = None,
    height: Annotated[int | None, Field(default=None, description="Height of the image in pixels")] = None,
    seed: Annotated[int | None, Field(default=None, description="Seed for reproducibility")] = None,
) -> dict[str, Any]:
    """Generate an image using the Flux API and save it to a local file.

    Args:
        prompt: The text prompt for image generation
        output_dir: The directory to save the image
        model_name: The model version to use (default: flux.1.1-pro)
        width: Width of the image in pixels (must be a multiple of 32, between 256 and 1440)
        height: Height of the image in pixels (must be a multiple of 32, between 256 and 1440)
        seed: Optional seed for reproducibility

    Returns:
        A dictionary containing information about the generated image
    """
    config = Config()

    if not config.bfl_api_key:
        return {
            "success": False,
            "error": "BFL_API_KEY not provided. Set BFL_API_KEY environment variable.",
        }

    try:
        # Create output directory if it doesn't exist
        output_path = Path(output_dir).expanduser().resolve()
        output_path.mkdir(parents=True, exist_ok=True)

        # Generate a filename based on the prompt
        filename = "_".join(prompt.split()[:5]).lower()
        filename = "".join(c if c.isalnum() or c == "_" else "_" for c in filename)
        if len(filename) > 50:
            filename = filename[:50]

        # Full path for the image (extension will be added by the save method)
        image_path = output_path / filename

        logger.info(f"Generating image with prompt: {prompt}")

        # Create image request
        image_request = ImageRequest(
            prompt=prompt,
            name=model_name,
            width=width,
            height=height,
            seed=seed,
            api_key=config.bfl_api_key,
            validate=True,
        )

        # Request and save the image
        logger.info("Requesting image from Flux API...")
        await image_request.request()

        logger.info("Waiting for image generation to complete...")
        await image_request.retrieve()

        logger.info("Saving image to disk...")
        saved_path = await image_request.save(str(image_path))

        # Get the image URL
        image_url = await image_request.get_url()

        return {
            "success": True,
            "prompt": prompt,
            "model": model_name,
            "image_path": saved_path,
            "image_url": image_url,
            "message": f"Successfully generated and saved image to {saved_path}",
        }

    except ApiException as e:
        return {
            "success": False,
            "error": f"API error: {e}",
            "message": f"Failed to generate image: {e}",
        }
    except ValueError as e:
        return {
            "success": False,
            "error": str(e),
            "message": f"Invalid parameters: {e}",
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "message": f"Failed to generate image: {e}",
        }

```

--------------------------------------------------------------------------------
/tests/flux/test_flux_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for Flux API tools."""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from mcp_toolbox.flux.api import ApiException
from mcp_toolbox.flux.tools import flux_generate_image


@pytest.fixture
def mock_config():
    """Mock Config with BFL_API_KEY."""
    with patch("mcp_toolbox.flux.tools.Config") as mock_config:
        config_instance = MagicMock()
        config_instance.bfl_api_key = "test_api_key"
        mock_config.return_value = config_instance
        yield mock_config


@pytest.fixture
def mock_image_request():
    """Mock ImageRequest class."""
    with patch("mcp_toolbox.flux.tools.ImageRequest") as mock_class:
        instance = AsyncMock()
        instance.request = AsyncMock()
        instance.retrieve = AsyncMock(return_value={"sample": "https://example.com/image.png"})
        instance.get_url = AsyncMock(return_value="https://example.com/image.png")
        instance.save = AsyncMock(return_value="/path/to/saved/image.png")
        mock_class.return_value = instance
        yield mock_class, instance


@pytest.mark.asyncio
async def test_flux_generate_image_success(mock_config, mock_image_request):
    """Test successful image generation."""
    mock_class, mock_instance = mock_image_request

    result = await flux_generate_image(
        prompt="a beautiful landscape",
        output_dir="/tmp/images",
        model_name="flux.1.1-pro",
        width=512,
        height=512,
        seed=42,
    )

    # Check that ImageRequest was created with correct parameters
    mock_class.assert_called_once_with(
        prompt="a beautiful landscape",
        name="flux.1.1-pro",
        width=512,
        height=512,
        seed=42,
        api_key="test_api_key",
        validate=True,
    )

    # Check that methods were called
    mock_instance.request.assert_called_once()
    mock_instance.retrieve.assert_called_once()
    mock_instance.save.assert_called_once()
    mock_instance.get_url.assert_called_once()

    # Check result
    assert result["success"] is True
    assert result["prompt"] == "a beautiful landscape"
    assert result["model"] == "flux.1.1-pro"
    assert result["image_path"] == "/path/to/saved/image.png"
    assert result["image_url"] == "https://example.com/image.png"
    assert "Successfully generated" in result["message"]


@pytest.mark.asyncio
async def test_flux_generate_image_no_api_key():
    """Test image generation with no API key."""
    with patch("mcp_toolbox.flux.tools.Config") as mock_config:
        config_instance = MagicMock()
        config_instance.bfl_api_key = None
        mock_config.return_value = config_instance

        result = await flux_generate_image(
            prompt="a beautiful landscape",
            output_dir="/tmp/images",
        )

        assert result["success"] is False
        assert "BFL_API_KEY not provided" in result["error"]


@pytest.mark.asyncio
async def test_flux_generate_image_api_exception(mock_config):
    """Test image generation with API exception."""
    with patch("mcp_toolbox.flux.tools.ImageRequest") as mock_class:
        instance = AsyncMock()
        instance.request = AsyncMock(side_effect=ApiException(400, "Invalid request"))
        mock_class.return_value = instance

        result = await flux_generate_image(
            prompt="a beautiful landscape",
            output_dir="/tmp/images",
        )

        assert result["success"] is False
        assert "API error" in result["error"]


@pytest.mark.asyncio
async def test_flux_generate_image_value_error(mock_config):
    """Test image generation with value error."""
    with patch("mcp_toolbox.flux.tools.ImageRequest") as mock_class:
        instance = AsyncMock()
        instance.request = AsyncMock(side_effect=ValueError("Invalid width"))
        mock_class.return_value = instance

        result = await flux_generate_image(
            prompt="a beautiful landscape",
            output_dir="/tmp/images",
            width=123,  # Not a multiple of 32
        )

        assert result["success"] is False
        assert "Invalid parameters" in result["message"]

```

--------------------------------------------------------------------------------
/tests/markitdown/test_markitdown_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for Markitdown tools."""

from unittest.mock import MagicMock, patch

import pytest

from mcp_toolbox.markitdown.tools import (
    convert_file_to_markdown,
    convert_url_to_markdown,
    md,
)


# Mock for MarkItDown.convert method
@pytest.fixture
def mock_markitdown_convert():
    """Mock for MarkItDown.convert method."""
    with patch.object(md, "convert") as mock_convert:
        # Set up the mock to return a result with text_content
        mock_result = MagicMock()
        mock_result.text_content = "# Converted Markdown\n\nThis is converted content."
        mock_convert.return_value = mock_result
        yield mock_convert


@pytest.fixture
def mock_markitdown_convert_url():
    """Mock for MarkItDown.convert method."""
    with patch.object(md, "convert_url") as mock_convert:
        # Set up the mock to return a result with text_content
        mock_result = MagicMock()
        mock_result.text_content = "# Converted Markdown\n\nThis is converted content."
        mock_convert.return_value = mock_result
        yield mock_convert


# Test convert_file_to_markdown function
@pytest.mark.asyncio
async def test_convert_file_to_markdown_success(mock_markitdown_convert):
    """Test successful file conversion."""
    # Mock file operations
    with (
        patch("pathlib.Path.is_file", return_value=True),
        patch("pathlib.Path.write_text") as mock_write_text,
        patch("pathlib.Path.mkdir") as mock_mkdir,
    ):
        # Call the function
        result = await convert_file_to_markdown("input.txt", "output.md")

        # Verify the output file was written with the converted content
        mock_write_text.assert_called_once_with("# Converted Markdown\n\nThis is converted content.")

        # Verify the output directory was created
        mock_mkdir.assert_called_once_with(parents=True, exist_ok=True)

        # Verify the result is as expected
        assert result["success"] is True
        assert "input.txt" in result["input_file"]
        assert "output.md" in result["output_file"]


@pytest.mark.asyncio
async def test_convert_file_to_markdown_file_not_found():
    """Test file conversion when input file doesn't exist."""
    # Mock file operations
    with patch("pathlib.Path.is_file", return_value=False):
        # Call the function
        result = await convert_file_to_markdown("nonexistent.txt", "output.md")

        # Verify the result is as expected
        assert result["success"] is False
        assert "not found" in result["error"]


@pytest.mark.asyncio
async def test_convert_file_to_markdown_exception(mock_markitdown_convert):
    """Test file conversion when an exception occurs."""
    # Set up the mock to raise an exception
    mock_markitdown_convert.side_effect = Exception("Conversion error")

    # Mock file operations
    with (
        patch("pathlib.Path.is_file", return_value=True),
        patch("pathlib.Path.read_text", return_value="Original content"),
        patch("pathlib.Path.mkdir"),
    ):
        # Call the function and expect an exception
        with pytest.raises(Exception) as excinfo:
            await convert_file_to_markdown("input.txt", "output.md")

        # Verify the exception message
        assert "Conversion error" in str(excinfo.value)


@pytest.mark.asyncio
async def test_convert_url_to_markdown_success(mock_markitdown_convert_url):
    """Test successful file conversion."""
    # Mock file operations
    with (
        patch("pathlib.Path.write_text") as mock_write_text,
        patch("pathlib.Path.mkdir") as mock_mkdir,
    ):
        # Call the function
        result = await convert_url_to_markdown("https://example.com", "output.md")

        # Verify the output file was written with the converted content
        mock_write_text.assert_called_once_with("# Converted Markdown\n\nThis is converted content.")

        # Verify the output directory was created
        mock_mkdir.assert_called_once_with(parents=True, exist_ok=True)

        # Verify the result is as expected
        assert result["success"] is True
        assert "https://example.com" in result["url"]
        assert "output.md" in result["output_file"]

```

--------------------------------------------------------------------------------
/mcp_toolbox/web/tools.py:
--------------------------------------------------------------------------------

```python
import functools
from pathlib import Path
from typing import Annotated, Any, Literal

import anyio
from duckduckgo_search import DDGS
from httpx import AsyncClient
from pydantic import Field
from tavily import AsyncTavilyClient

from mcp_toolbox.app import mcp
from mcp_toolbox.config import Config

client = AsyncClient(
    follow_redirects=True,
)


async def get_http_content(
    url: Annotated[str, Field(description="The URL to request")],
    method: Annotated[str, Field(default="GET", description="HTTP method to use")] = "GET",
    headers: Annotated[dict[str, str] | None, Field(default=None, description="Optional HTTP headers")] = None,
    params: Annotated[dict[str, str] | None, Field(default=None, description="Optional query parameters")] = None,
    data: Annotated[dict[str, str] | None, Field(default=None, description="Optional request body data")] = None,
    timeout: Annotated[int, Field(default=60, description="Request timeout in seconds")] = 60,
) -> str:
    response = await client.request(
        method,
        url,
        headers=headers,
        params=params,
        data=data,
        timeout=timeout,
    )
    response.raise_for_status()
    return response.text


@mcp.tool(
    description="Save HTML from a URL.",
)
async def save_html(
    url: Annotated[str, Field(description="The URL to save")],
    output_path: Annotated[str, Field(description="The path to save the HTML")],
) -> dict[str, Any]:
    output_path: Path = Path(output_path).expanduser().resolve().absolute()

    output_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        content = await get_http_content(url)
    except Exception as e:
        return {
            "success": False,
            "error": f"Failed to save HTML: {e!s}",
        }

    try:
        output_path.write_text(content)
        return {
            "success": True,
            "url": url,
            "output_path": output_path.as_posix(),
        }
    except Exception as e:
        return {
            "success": False,
            "error": f"Failed to save HTML: {e!s}",
        }


@mcp.tool(
    description="Get HTML from a URL.",
)
async def get_html(url: Annotated[str, Field(description="The URL to get")]) -> dict[str, Any]:
    try:
        content = await get_http_content(url)
        return {
            "success": True,
            "url": url,
            "content": content,
        }
    except Exception as e:
        return {
            "success": False,
            "error": f"Failed to get HTML: {e!s}",
        }


if Config().tavily_api_key:

    @mcp.tool(
        description="Search with Tavily.",
    )
    async def search_with_tavily(
        query: Annotated[str, Field(description="The search query")],
        search_deep: Annotated[
            Literal["basic", "advanced"], Field(default="basic", description="The search depth")
        ] = "basic",
        topic: Annotated[Literal["general", "news"], Field(default="general", description="The topic")] = "general",
        time_range: Annotated[
            Literal["day", "week", "month", "year", "d", "w", "m", "y"] | None,
            Field(default=None, description="The time range"),
        ] = None,
    ) -> list[dict[str, Any]]:
        client = AsyncTavilyClient(Config().tavily_api_key)
        results = await client.search(query, search_depth=search_deep, topic=topic, time_range=time_range)
        if not results["results"]:
            return {
                "success": False,
                "error": "No search results found.",
            }
        return results["results"]


if Config().duckduckgo_api_key:

    @mcp.tool(
        description="Search with DuckDuckGo.",
    )
    async def search_with_duckduckgo(
        query: Annotated[str, Field(description="The search query")],
        max_results: Annotated[int, Field(default=10, description="The maximum number of results")] = 10,
    ) -> list[dict[str, Any]]:
        ddg = DDGS(Config().duckduckgo_api_key)
        search = functools.partial(ddg.text, max_results=max_results)
        results = await anyio.to_thread.run_sync(search, query)
        if len(results) == 0:
            return {
                "success": False,
                "error": "No search results found.",
            }
        return results

```

--------------------------------------------------------------------------------
/mcp_toolbox/xiaoyuzhoufm/tools.py:
--------------------------------------------------------------------------------

```python
"""XiaoyuZhouFM podcast crawler tools."""

import os
import re
from typing import Annotated, Any

import httpx
from loguru import logger
from pydantic import Field

from mcp_toolbox.app import mcp
from mcp_toolbox.config import Config


class XiaoyuZhouFMCrawler:
    """XiaoyuZhouFM podcast crawler."""

    def __init__(self):
        """Initialize the crawler."""
        self.config = Config()

    async def extract_audio_url(self, url: str) -> str:
        """Extract audio URL from XiaoyuZhouFM episode page.

        Args:
            url: The XiaoyuZhouFM episode URL

        Returns:
            The audio URL

        Raises:
            ValueError: If the audio URL cannot be found
        """
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(url)
                response.raise_for_status()
                html_content = response.text

                # Use regex to find the og:audio meta tag
                pattern = r'<meta\s+property="og:audio"\s+content="([^"]+)"'
                match = re.search(pattern, html_content)

                if not match:
                    raise ValueError("Could not find audio URL in the page")

                audio_url = match.group(1)
                return audio_url

            except httpx.HTTPStatusError as e:
                raise ValueError(f"HTTP error: {e.response.status_code} - {e.response.reason_phrase}") from e
            except httpx.RequestError as e:
                raise ValueError(f"Request error: {e}") from e

    async def download_audio(self, audio_url: str, output_path: str) -> str:
        """Download audio file from URL.

        Args:
            audio_url: The audio file URL
            output_path: The path to save the audio file

        Returns:
            The path to the downloaded file

        Raises:
            ValueError: If the download fails
        """
        # Create directory if it doesn't exist
        output_dir = os.path.dirname(output_path)
        if output_dir:
            os.makedirs(output_dir, exist_ok=True)

        async with httpx.AsyncClient() as client:
            try:
                logger.info(f"Downloading audio from {audio_url}")
                response = await client.get(audio_url)
                response.raise_for_status()

                with open(output_path, "wb") as f:
                    f.write(response.content)

                logger.info(f"Audio saved to {output_path}")
                return output_path

            except httpx.HTTPStatusError as e:
                raise ValueError(f"HTTP error: {e.response.status_code} - {e.response.reason_phrase}") from e
            except httpx.RequestError as e:
                raise ValueError(f"Request error: {e}") from e
            except OSError as e:
                raise ValueError(f"IO error: {e}") from e


# Initialize crawler
crawler = XiaoyuZhouFMCrawler()


@mcp.tool(description="Crawl and download a podcast episode from XiaoyuZhouFM.")
async def xiaoyuzhoufm_download(
    xiaoyuzhoufm_url: Annotated[str, Field(description="The URL of the XiaoyuZhouFM episode")],
    output_dir: Annotated[str, Field(description="The directory to save the audio file")],
) -> dict[str, Any]:
    """Crawl and download a podcast episode from XiaoyuZhouFM.

    Args:
        xiaoyuzhoufm_url: The URL of the XiaoyuZhouFM episode
        output_dir: The directory to save the audio file

    Returns:
        A dictionary containing the audio URL and the path to the downloaded file
    """
    try:
        # Validate URL
        if not xiaoyuzhoufm_url.startswith("https://www.xiaoyuzhoufm.com/episode/"):
            raise ValueError("Invalid XiaoyuZhouFM URL. URL should start with 'https://www.xiaoyuzhoufm.com/episode/'")

        # Extract episode ID from URL
        episode_id = xiaoyuzhoufm_url.split("/")[-1]
        if not episode_id:
            episode_id = "episode"

        # Extract audio URL
        audio_url = await crawler.extract_audio_url(xiaoyuzhoufm_url)

        # Determine file extension from audio URL
        file_extension = "m4a"
        if "." in audio_url.split("/")[-1]:
            file_extension = audio_url.split("/")[-1].split(".")[-1]

        # Create output path with episode ID as filename
        output_path = os.path.join(output_dir, f"{episode_id}.{file_extension}")

        # Download audio
        downloaded_path = await crawler.download_audio(audio_url, output_path)

        return {
            "audio_url": audio_url,
            "downloaded_path": downloaded_path,
            "message": f"Successfully downloaded podcast to {downloaded_path}",
        }
    except Exception as e:
        return {
            "error": str(e),
            "message": f"Failed to download podcast: {e!s}",
        }

```

--------------------------------------------------------------------------------
/tests/xiaoyuzhoufm/test_xiaoyuzhoufm_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for XiaoyuZhouFM tools."""

import json
import os
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from mcp_toolbox.xiaoyuzhoufm.tools import XiaoyuZhouFMCrawler, xiaoyuzhoufm_download


# Helper function to load mock data
def load_mock_data(filename):
    mock_dir = Path(__file__).parent.parent / "mock" / "xiaoyuzhoufm"
    mock_dir.mkdir(parents=True, exist_ok=True)
    file_path = mock_dir / filename

    if not file_path.exists():
        # Create empty mock data if it doesn't exist
        mock_data = {"mock": "data"}
        with open(file_path, "w") as f:
            json.dump(mock_data, f)

    with open(file_path) as f:
        return json.load(f)


# Mock HTML content with audio URL
MOCK_HTML_CONTENT = """
<!DOCTYPE html>
<html>
<head>
    <meta property="og:audio" content="https://media.example.com/podcasts/episode123.m4a">
    <title>Test Episode</title>
</head>
<body>
    <h1>Test Episode</h1>
</body>
</html>
"""


# Test XiaoyuZhouFMCrawler.extract_audio_url method
@pytest.mark.asyncio
async def test_extract_audio_url():
    # Create a mock response
    mock_response = MagicMock()
    mock_response.text = MOCK_HTML_CONTENT
    mock_response.raise_for_status = MagicMock()  # Changed from AsyncMock to MagicMock

    # Create a mock client
    mock_client = AsyncMock()
    mock_client.__aenter__.return_value.get.return_value = mock_response

    # Patch httpx.AsyncClient
    with patch("httpx.AsyncClient", return_value=mock_client):
        crawler = XiaoyuZhouFMCrawler()
        audio_url = await crawler.extract_audio_url("https://www.xiaoyuzhoufm.com/episode/test")

    # Assert the audio URL is extracted correctly
    assert audio_url == "https://media.example.com/podcasts/episode123.m4a"


# Test XiaoyuZhouFMCrawler.download_audio method
@pytest.mark.asyncio
async def test_download_audio(tmp_path):
    # Create a mock response with binary content
    mock_response = MagicMock()
    mock_response.content = b"test audio content"
    mock_response.raise_for_status = MagicMock()  # Changed from AsyncMock to MagicMock

    # Create a mock client
    mock_client = AsyncMock()
    mock_client.__aenter__.return_value.get.return_value = mock_response

    # Patch httpx.AsyncClient
    with patch("httpx.AsyncClient", return_value=mock_client):
        crawler = XiaoyuZhouFMCrawler()
        output_path = str(tmp_path / "test_audio.m4a")
        downloaded_path = await crawler.download_audio("https://media.example.com/podcasts/episode123.m4a", output_path)

    # Assert the file was downloaded correctly
    assert downloaded_path == output_path
    assert os.path.exists(output_path)
    with open(output_path, "rb") as f:
        content = f.read()
        assert content == b"test audio content"


# Test xiaoyuzhoufm_download tool
@pytest.mark.asyncio
async def test_xiaoyuzhoufm_download():
    # Mock the crawler methods
    with (
        patch("mcp_toolbox.xiaoyuzhoufm.tools.crawler.extract_audio_url") as mock_extract,
        patch("mcp_toolbox.xiaoyuzhoufm.tools.crawler.download_audio") as mock_download,
    ):
        # Set up the mocks
        mock_extract.return_value = "https://media.example.com/podcasts/episode123.m4a"
        mock_download.return_value = "/tmp/test/test.m4a"

        # Call the tool
        result = await xiaoyuzhoufm_download("https://www.xiaoyuzhoufm.com/episode/test", "/tmp/test")

        # Assert the result
        assert result["audio_url"] == "https://media.example.com/podcasts/episode123.m4a"
        assert result["downloaded_path"] == "/tmp/test/test.m4a"
        assert "Successfully downloaded" in result["message"]

        # Verify the mocks were called correctly
        mock_extract.assert_called_once_with("https://www.xiaoyuzhoufm.com/episode/test")
        # The output path should be constructed from the output_dir and episode ID
        mock_download.assert_called_once_with("https://media.example.com/podcasts/episode123.m4a", "/tmp/test/test.m4a")


# Test xiaoyuzhoufm_download tool with invalid URL
@pytest.mark.asyncio
async def test_xiaoyuzhoufm_download_invalid_url():
    # Call the tool with an invalid URL
    result = await xiaoyuzhoufm_download("https://invalid-url.com", "/tmp/test")

    # Assert the result contains an error
    assert "error" in result
    assert "Invalid XiaoyuZhouFM URL" in result["message"]


# Test xiaoyuzhoufm_download tool with extraction error
@pytest.mark.asyncio
async def test_xiaoyuzhoufm_download_extraction_error():
    # Mock the crawler methods to raise an error
    with patch("mcp_toolbox.xiaoyuzhoufm.tools.crawler.extract_audio_url") as mock_extract:
        # Set up the mock to raise an error
        mock_extract.side_effect = ValueError("Could not find audio URL")

        # Call the tool
        result = await xiaoyuzhoufm_download("https://www.xiaoyuzhoufm.com/episode/test", "/tmp/test")

        # Assert the result contains an error
        assert "error" in result
        assert "Could not find audio URL" in result["error"]
        assert "Failed to download podcast" in result["message"]

```

--------------------------------------------------------------------------------
/tests/enhance/test_memory.py:
--------------------------------------------------------------------------------

```python
import pytest

from mcp_toolbox.enhance.memory import LocalMemory, MemoryModel


@pytest.fixture
def memory_file(tmp_path):
    return tmp_path / "test-memory"


@pytest.fixture
def local_memory(memory_file):
    memory = LocalMemory("test-session", memory_file)
    # Ensure the file is empty at the start of each test
    memory.clear()
    return memory


def test_memory_basic(local_memory: LocalMemory):
    """Test basic memory operations"""
    assert local_memory.session_id == "test-session"

    # Store and query
    memory_model = local_memory.store("test-brief", "test-detail")
    assert isinstance(memory_model, MemoryModel)
    assert memory_model.session_id == "test-session"
    assert memory_model.brief == "test-brief"
    assert memory_model.detail == "test-detail"
    assert memory_model.embedding is not None

    # Query
    results = local_memory.query("test-brief")
    assert len(results) == 1
    assert results[0].brief == "test-brief"
    assert results[0].detail == "test-detail"
    assert results[0].session_id == "test-session"


def test_memory_cross_session(memory_file):
    """Test cross-session memory operations"""
    # Create two memory instances with different session IDs
    memory1 = LocalMemory("session-1", memory_file)
    memory1.clear()  # Start with a clean file

    # Store a memory in session 1
    memory1.store("brief-1", "detail-1")

    # Create a second memory instance with a different session ID
    memory2 = LocalMemory("session-2", memory_file)

    # Store a memory in session 2
    memory2.store("brief-2", "detail-2")

    # Refresh memory1 to see both entries
    memory1.current_memory = memory1._load()

    # Query with cross_session=True (default)
    results1 = memory1.query("brief", top_k=5, refresh=True)
    assert len(results1) == 2, f"Expected 2 results, got {len(results1)}: {results1}"

    # Query with cross_session=False
    results2 = memory1.query("brief", top_k=5, cross_session=False)
    assert len(results2) == 1, f"Expected 1 result, got {len(results2)}: {results2}"
    assert results2[0].session_id == "session-1"

    results3 = memory2.query("brief", top_k=5, cross_session=False)
    assert len(results3) == 1, f"Expected 1 result, got {len(results3)}: {results3}"
    assert results3[0].session_id == "session-2"


def test_memory_clear(memory_file):
    """Test clearing memory"""
    # Create a new memory instance
    memory = LocalMemory("test-session", memory_file)
    memory.clear()  # Start with a clean file

    # Store some memories
    memory.store("brief-1", "detail-1")
    memory.store("brief-2", "detail-2")

    # Verify memories are stored
    results = memory.query("brief", top_k=5)
    assert len(results) == 2, f"Expected 2 results, got {len(results)}: {results}"

    # Clear memories
    memory.clear()

    # Verify memories are cleared
    results = memory.query("brief", top_k=5)
    assert len(results) == 0, f"Expected 0 results, got {len(results)}: {results}"


def test_memory_empty_file(memory_file):
    """Test handling of empty memory file"""
    # Create a new memory instance with a non-existent file
    memory = LocalMemory("test-session", memory_file)
    memory.clear()  # Start with a clean file

    # Query should return empty list
    results = memory.query("test")
    assert len(results) == 0

    # Store should work even with empty file
    memory.store("test-brief", "test-detail")
    results = memory.query("test")
    assert len(results) == 1


def test_memory_top_k(memory_file):
    """Test top_k parameter in query"""
    # Create a new memory instance
    memory = LocalMemory("test-session", memory_file)
    memory.clear()  # Start with a clean file

    # Store multiple memories with distinct embeddings
    memory.store("apple", "A fruit")
    memory.store("banana", "A yellow fruit")
    memory.store("orange", "A citrus fruit")
    memory.store("grape", "A small fruit")

    # Query with different top_k values
    results1 = memory.query("fruit", top_k=2)
    assert len(results1) == 2, f"Expected 2 results, got {len(results1)}: {results1}"

    results2 = memory.query("fruit", top_k=4)
    assert len(results2) == 4, f"Expected 4 results, got {len(results2)}: {results2}"

    # Query with top_k larger than available results
    results3 = memory.query("fruit", top_k=10)
    assert len(results3) == 4, f"Expected 4 results, got {len(results3)}: {results3}"


def test_memory_refresh(memory_file):
    """Test refresh parameter in query"""
    # Create two memory instances with the same session ID and file
    memory1 = LocalMemory("same-session", memory_file)
    memory1.clear()  # Start with a clean file

    memory2 = LocalMemory("same-session", memory_file)

    # Store a memory using the first instance
    memory1.store("test-brief", "test-detail")

    # Query using the second instance without refresh
    results1 = memory2.query("test", refresh=False)
    assert len(results1) == 0, f"Expected 0 results, got {len(results1)}: {results1}"

    # Query using the second instance with refresh
    results2 = memory2.query("test", refresh=True)
    assert len(results2) == 1, f"Expected 1 result, got {len(results2)}: {results2}"

```

--------------------------------------------------------------------------------
/tests/command_line/test_command_line_tools.py:
--------------------------------------------------------------------------------

```python
import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from mcp_toolbox.command_line.tools import execute_command


# Mock for asyncio.create_subprocess_exec
class MockProcess:
    def __init__(self, stdout=b"", stderr=b"", returncode=0):
        self.stdout = stdout
        self.stderr = stderr
        self.returncode = returncode
        self.communicate = AsyncMock(return_value=(stdout, stderr))
        self.kill = MagicMock()


@pytest.mark.asyncio
async def test_execute_command_success():
    """Test successful command execution."""
    # Mock process with successful execution
    mock_process = MockProcess(stdout=b"test output", stderr=b"", returncode=0)

    with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec:
        result = await execute_command(["echo", "test"])

        # Verify subprocess was called with correct arguments
        mock_exec.assert_called_once()

        # Verify the result contains expected fields
        assert "stdout" in result
        assert "stderr" in result
        assert "return_code" in result
        assert result["stdout"] == "test output"
        assert result["stderr"] == ""
        assert result["return_code"] == 0


@pytest.mark.asyncio
async def test_execute_command_error():
    """Test command execution with error."""
    # Mock process with error
    mock_process = MockProcess(stdout=b"", stderr=b"error message", returncode=1)

    with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec:
        result = await execute_command(["invalid_command"])

        # Verify subprocess was called
        mock_exec.assert_called_once()

        # Verify the result contains expected fields
        assert "stdout" in result
        assert "stderr" in result
        assert "return_code" in result
        assert result["stdout"] == ""
        assert result["stderr"] == "error message"
        assert result["return_code"] == 1


@pytest.mark.asyncio
async def test_execute_command_timeout():
    """Test command execution timeout."""
    # Mock process that will time out
    mock_process = MockProcess()
    mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())

    with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec:
        result = await execute_command(["sleep", "100"], timeout_seconds=1)

        # Verify subprocess was called
        mock_exec.assert_called_once()

        # Verify process was killed
        mock_process.kill.assert_called_once()

        # Verify the result contains expected fields
        assert "error" in result
        assert "timed out" in result["error"]
        assert result["return_code"] == 124  # Timeout return code


@pytest.mark.asyncio
async def test_execute_command_exception():
    """Test exception during command execution."""
    with patch("asyncio.create_subprocess_exec", side_effect=Exception("Test exception")) as mock_exec:
        result = await execute_command(["echo", "test"])

        # Verify subprocess was called
        mock_exec.assert_called_once()

        # Verify the result contains expected fields
        assert "error" in result
        assert "Failed to execute command" in result["error"]
        assert result["return_code"] == 1


@pytest.mark.asyncio
async def test_execute_command_empty():
    """Test execution with empty command."""
    result = await execute_command([])

    # Verify the result contains expected fields
    assert "error" in result
    assert "Command cannot be empty" in result["error"]
    assert result["return_code"] == 1


@pytest.mark.asyncio
async def test_execute_command_with_working_dir():
    """Test command execution with working directory."""
    # Mock process with successful execution
    mock_process = MockProcess(stdout=b"test output", stderr=b"", returncode=0)
    test_dir = "/test_dir"  # Using a non-tmp directory for testing

    with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec:
        result = await execute_command(["echo", "test"], working_dir=test_dir)

        # Verify subprocess was called with correct arguments
        mock_exec.assert_called_once()
        _, kwargs = mock_exec.call_args
        assert kwargs["cwd"] == Path(test_dir)

        # Verify the result contains expected fields
        assert result["return_code"] == 0


@pytest.mark.asyncio
async def test_execute_command_with_tilde_in_working_dir():
    """Test command execution with tilde in working directory."""
    # Mock process with successful execution
    mock_process = MockProcess(stdout=b"test output", stderr=b"", returncode=0)
    test_dir = "~/test_dir"  # Using tilde in path

    with (
        patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec,
        patch("pathlib.Path.expanduser", return_value=Path("/home/user/test_dir")) as mock_expanduser,
    ):
        result = await execute_command(["echo", "test"], working_dir=test_dir)

        # Verify expanduser was called
        mock_expanduser.assert_called_once()

        # Verify subprocess was called with correct arguments
        mock_exec.assert_called_once()
        _, kwargs = mock_exec.call_args
        assert kwargs["cwd"] == Path("/home/user/test_dir")

        # Verify the result contains expected fields
        assert result["return_code"] == 0

```

--------------------------------------------------------------------------------
/mcp_toolbox/enhance/memory.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations

import uuid
from functools import cache
from os import PathLike
from pathlib import Path
from typing import Annotated

import numpy as np
import portalocker
from fastembed import TextEmbedding
from pydantic import BaseModel, Field

from mcp_toolbox.config import Config
from mcp_toolbox.log import logger

embedding_model = TextEmbedding()
logger.info("The model BAAI/bge-small-en-v1.5 is ready to use.")


def embed_text(text: str) -> list[float]:
    return next(iter(embedding_model.embed([text])))


class MemoryModel(BaseModel):
    session_id: Annotated[str, Field(description="The session id of the memory")]
    brief: Annotated[str, Field(description="The brief information of the memory")]
    detail: Annotated[str, Field(description="The detailed information of the brief text")]
    embedding: Annotated[list[float] | None, Field(description="The embedding of the brief text")] = None


@cache
def get_current_session_memory() -> LocalMemory:
    return LocalMemory.new_session()


class LocalMemory:
    @classmethod
    def new_session(cls) -> LocalMemory:
        return cls.use_session(uuid.uuid4().hex)

    @classmethod
    def use_session(cls, session_id: str) -> LocalMemory:
        config = Config()
        return cls(session_id, config.memory_file)

    def __init__(self, session_id: str, memory_file: PathLike):
        self.session_id = session_id
        self.memory_file = Path(memory_file)

        self.memory_file.parent.mkdir(parents=True, exist_ok=True)
        self.memory_file.touch(exist_ok=True)
        self.current_memory: np.ndarray = self._load()

    def _load(self) -> np.ndarray:
        if not self.memory_file.exists():
            return np.empty((0, 4), dtype=object)

        try:
            with portalocker.Lock(self.memory_file, "rb") as f:
                memory = np.load(f, allow_pickle=True)
        except Exception as e:
            logger.warning(f"Error loading memory: {e}")
            memory = np.empty((0, 4), dtype=object)

        return memory

    def store(self, brief: str, detail: str) -> MemoryModel:
        try:
            # Keep the file locked during the entire operation
            with portalocker.Lock(self.memory_file, "rb+") as f:
                try:
                    # Load existing memory
                    current_memory = np.load(f, allow_pickle=True)
                except (ValueError, EOFError):
                    # File is empty or not a valid numpy array
                    current_memory = np.empty((0, 4), dtype=object)

                embedding = embed_text(brief)

                # Append the new entry
                if current_memory.size == 0:
                    # Initialize with first entry including all 4 fields
                    updated_memory = np.array([[self.session_id, brief, detail, embedding]], dtype=object)
                else:
                    updated_memory = np.append(
                        current_memory,
                        np.array([[self.session_id, brief, detail, embedding]], dtype=object),
                        axis=0,
                    )

                # Save the updated memory
                f.seek(0)
                f.truncate()
                np.save(f, updated_memory)
        except Exception as e:
            logger.warning(f"Error storing memory: {e}")
            raise

        self.current_memory = self._load()

        return MemoryModel(
            session_id=self.session_id,
            brief=brief,
            detail=detail,
            embedding=embedding,
        )

    def query(
        self,
        query: str,
        top_k: int = 3,
        cross_session: bool = True,
        refresh: bool = False,
    ) -> list[MemoryModel]:
        if refresh:
            self.current_memory = self._load()
        embedding = embed_text(query)

        # Check if memory is empty
        if self.current_memory.size == 0:
            return []

        # Filter by session if cross_session is False
        if not cross_session:
            # Create a mask for entries from the current session
            session_mask = self.current_memory[:, 0] == self.session_id
            if not any(session_mask):
                return []  # No entries for current session

            # Filter memory to only include current session
            filtered_memory = self.current_memory[session_mask]

            # Calculate similarity between query embedding and each stored embedding
            similarity = np.array([np.dot(stored_embedding, embedding) for stored_embedding in filtered_memory[:, 3]])
            top_k_idx = np.argsort(similarity)[-min(top_k, len(similarity)) :]

            return [
                MemoryModel(
                    session_id=filtered_memory[idx, 0],
                    brief=filtered_memory[idx, 1],
                    detail=filtered_memory[idx, 2],
                )
                for idx in top_k_idx
            ]
        else:
            # Calculate similarity between query embedding and each stored embedding
            similarity = np.array([
                np.dot(stored_embedding, embedding) for stored_embedding in self.current_memory[:, 3]
            ])
            top_k_idx = np.argsort(similarity)[-min(top_k, len(similarity)) :]

            return [
                MemoryModel(
                    session_id=self.current_memory[idx, 0],
                    brief=self.current_memory[idx, 1],
                    detail=self.current_memory[idx, 2],
                )
                for idx in top_k_idx
            ]

    def clear(self):
        # Create an empty memory array
        empty_memory = np.empty((0, 4), dtype=object)

        # Update the file with the empty array
        with portalocker.Lock(self.memory_file, "wb") as f:
            np.save(f, empty_memory)

        # Update the current memory
        self.current_memory = empty_memory

```

--------------------------------------------------------------------------------
/tests/enhance/test_enhance_tools.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import patch

import pytest

from mcp_toolbox.enhance.memory import LocalMemory, MemoryModel
from mcp_toolbox.enhance.tools import forget, get_session_id, recall, remember, think


@pytest.mark.asyncio
async def test_think_returns_dict():
    """Test that the think function returns a dictionary."""
    result = await think("Test thought")
    assert isinstance(result, dict), "think() should return a dictionary"


@pytest.mark.asyncio
async def test_think_returns_correct_thought():
    """Test that the returned dictionary contains the input thought."""
    test_thought = "This is a test thought"
    result = await think(test_thought)
    assert result == {"thought": test_thought}, "think() should return a dictionary with the input thought"


@pytest.mark.asyncio
async def test_think_with_different_thought_types():
    """Test think() with various types of thoughts."""
    test_cases = [
        "Simple string thought",
        "Thought with special characters: !@#$%^&*()",
        "Thought with numbers: 12345",
        "Thought with unicode: こんにちは 世界",
        "",  # Empty string
    ]

    for test_thought in test_cases:
        result = await think(test_thought)
        assert result == {"thought": test_thought}, f"Failed for thought: {test_thought}"


@pytest.fixture
def memory_file(tmp_path):
    return tmp_path / "test-memory"


@pytest.fixture
def mock_memory(memory_file):
    memory = LocalMemory("test-session", memory_file)
    memory.clear()  # Start with a clean file
    return memory


@patch("mcp_toolbox.enhance.tools.get_current_session_memory")
def test_get_session_id(mock_get_memory, mock_memory):
    """Test that get_session_id returns the correct session ID."""
    mock_get_memory.return_value = mock_memory

    result = get_session_id()

    assert result == {"session_id": "test-session"}
    mock_get_memory.assert_called_once()


@patch("mcp_toolbox.enhance.tools.get_current_session_memory")
def test_remember(mock_get_memory, mock_memory):
    """Test that remember stores a memory and returns the correct data."""
    mock_get_memory.return_value = mock_memory

    result = remember("test-brief", "test-detail")

    assert result == {
        "session_id": "test-session",
        "brief": "test-brief",
        "detail": "test-detail",
    }
    mock_get_memory.assert_called_once()

    # Verify the memory was stored
    memories = mock_memory.query("test-brief")
    assert len(memories) == 1
    assert memories[0].brief == "test-brief"
    assert memories[0].detail == "test-detail"


@patch("mcp_toolbox.enhance.tools.get_current_session_memory")
def test_recall_current_session(mock_get_memory, mock_memory):
    """Test that recall retrieves memories from the current session."""
    mock_get_memory.return_value = mock_memory

    # Store some memories
    mock_memory.store("brief-1", "detail-1")
    mock_memory.store("brief-2", "detail-2")

    # Recall with default parameters (current session)
    result = recall("brief")

    assert len(result) == 2
    assert all(isinstance(item, dict) for item in result)
    assert all("session_id" in item and "brief" in item and "detail" in item for item in result)
    mock_get_memory.assert_called()


@patch("mcp_toolbox.enhance.tools.LocalMemory.use_session")
def test_recall_specific_session(mock_use_session, mock_memory):
    """Test that recall retrieves memories from a specific session."""
    mock_use_session.return_value = mock_memory

    # Store some memories
    mock_memory.store("brief-1", "detail-1")

    # Recall with specific session ID
    result = recall("brief", session_id="specific-session")

    assert len(result) == 1
    mock_use_session.assert_called_once_with("specific-session")


@patch("mcp_toolbox.enhance.tools.get_current_session_memory")
def test_recall_cross_session(mock_get_memory, mock_memory):
    """Test that recall retrieves memories across sessions when cross_session is True."""
    mock_get_memory.return_value = mock_memory

    # Mock the query method to simulate cross-session behavior
    original_query = mock_memory.query

    def mock_query(query_text, top_k=3, cross_session=True):
        if cross_session:
            return [
                MemoryModel(session_id="session-1", brief="brief-1", detail="detail-1"),
                MemoryModel(session_id="session-2", brief="brief-2", detail="detail-2"),
            ]
        else:
            return [MemoryModel(session_id="test-session", brief="brief-1", detail="detail-1")]

    mock_memory.query = mock_query

    # Recall with cross_session=True
    result = recall("brief", cross_session=True)

    assert len(result) == 2
    assert result[0]["session_id"] == "session-1"
    assert result[1]["session_id"] == "session-2"

    # Recall with cross_session=False
    result = recall("brief", cross_session=False)

    assert len(result) == 1
    assert result[0]["session_id"] == "test-session"

    # Restore original query method
    mock_memory.query = original_query


@patch("mcp_toolbox.enhance.tools.get_current_session_memory")
def test_recall_top_k(mock_get_memory, mock_memory):
    """Test that recall respects the top_k parameter."""
    mock_get_memory.return_value = mock_memory

    # Store multiple memories
    for i in range(10):
        mock_memory.store(f"brief-{i}", f"detail-{i}")

    # Recall with top_k=3
    result = recall("brief", top_k=3)

    assert len(result) <= 3
    mock_get_memory.assert_called()


@patch("mcp_toolbox.enhance.tools.get_current_session_memory")
def test_forget(mock_get_memory, mock_memory):
    """Test that forget clears all memories."""
    mock_get_memory.return_value = mock_memory

    # Store some memories
    mock_memory.store("brief-1", "detail-1")
    mock_memory.store("brief-2", "detail-2")

    # Verify memories are stored
    assert len(mock_memory.query("brief")) == 2

    # Forget all memories
    result = forget()

    assert result == {"message": "All memories are cleared."}
    mock_get_memory.assert_called()

    # Verify memories are cleared
    assert len(mock_memory.query("brief")) == 0

```

--------------------------------------------------------------------------------
/mcp_toolbox/audio/tools.py:
--------------------------------------------------------------------------------

```python
"""Audio processing tools for transcription and analysis."""

import datetime
import os
from pathlib import Path
from typing import Annotated, Any

import whisper
from loguru import logger
from pydantic import Field

from mcp_toolbox.app import mcp

# Global variables to cache model and audio data
_model = None
_model_name = None
_audio = None
_audio_path = None
_detected_language = None


def load_model(model_name="base"):
    """
    Load and cache the Whisper model.

    Args:
        model_name: The name of the Whisper model to load (tiny, base, small, medium, large)

    Returns:
        The loaded Whisper model
    """
    global _model, _model_name

    # Load model if not loaded or if model name has changed
    if _model is None or _model_name != model_name:
        logger.info(f"Loading Whisper model: {model_name}")
        _model = whisper.load_model(model_name)
        _model_name = model_name

    return _model


def load_audio(audio_path, model_name="base"):
    """
    Load and cache the audio file.

    Args:
        audio_path: The path to the audio file
        model_name: The name of the Whisper model to use for language detection

    Returns:
        The loaded audio data
    """
    global _audio, _audio_path, _detected_language, _model

    # Ensure model is loaded
    model = load_model(model_name)

    # Only reload if it's a different file or not loaded yet
    audio_path = Path(audio_path).expanduser().resolve().absolute().as_posix()
    if _audio is None or _audio_path != audio_path:
        logger.info(f"Loading audio: {audio_path}")
        _audio = whisper.load_audio(audio_path)
        _audio_path = audio_path

        # Get audio duration in seconds
        audio_duration = len(_audio) / 16000  # Whisper uses 16kHz audio
        logger.info(f"Audio duration: {datetime.timedelta(seconds=int(audio_duration))!s}")

        # Detect language from the first chunk
        chunk_samples = 30 * 16000  # Use 30 seconds for language detection
        first_chunk = whisper.pad_or_trim(_audio[:chunk_samples])
        mel = whisper.log_mel_spectrogram(first_chunk).to(model.device)
        _, probs = model.detect_language(mel)
        _detected_language = max(probs, key=probs.get)
        logger.info(f"Detected language: {_detected_language}")

    return _audio


@mcp.tool(description="Get the length of an audio file in seconds.")
async def get_audio_length(
    audio_path: Annotated[str, Field(description="The path to the audio file")],
) -> dict[str, Any]:
    """Get the length of an audio file in seconds.

    Args:
        audio_path: The path to the audio file

    Returns:
        A dictionary containing the audio length in seconds and formatted time
    """
    try:
        if not os.path.exists(audio_path):
            raise ValueError(f"Audio file not found: {audio_path}")

        # Load audio
        audio = whisper.load_audio(audio_path)

        # Calculate duration
        audio_duration_seconds = len(audio) / 16000  # Whisper uses 16kHz audio
        formatted_duration = str(datetime.timedelta(seconds=int(audio_duration_seconds)))

        return {
            "duration_seconds": audio_duration_seconds,
            "formatted_duration": formatted_duration,
            "message": f"Audio length: {formatted_duration} ({audio_duration_seconds:.2f} seconds)",
        }
    except Exception as e:
        return {
            "error": str(e),
            "message": f"Failed to get audio length: {e!s}",
        }


@mcp.tool(description="Get transcribed text from a specific time range in an audio file.")
async def get_audio_text(
    audio_path: Annotated[str, Field(description="The path to the audio file")],
    start_time: Annotated[float, Field(description="Start time in seconds")],
    end_time: Annotated[float, Field(description="End time in seconds")],
    model_name: Annotated[
        str, Field(default="base", description="Whisper model name: tiny, base, small, medium, large")
    ] = "base",
) -> dict[str, Any]:
    """Extract and transcribe text from a specific time range in an audio file.

    Args:
        audio_path: The path to the audio file
        start_time: Start time in seconds
        end_time: End time in seconds
        model_name: Whisper model name (tiny, base, small, medium, large)
        initial_prompt: Initial prompt to guide transcription

    Returns:
        A dictionary containing the transcribed text and time range
    """
    try:
        if not os.path.exists(audio_path):
            raise ValueError(f"Audio file not found: {audio_path}")

        # Load audio to detect language if not already loaded
        _ = load_audio(audio_path, model_name)
        if _detected_language == "zh":
            initial_prompt = "以下是普通话的句子"
        elif _detected_language == "en":
            initial_prompt = "The following is English speech"
        else:
            initial_prompt = ""

        # Load model and audio (uses cached versions if already loaded)
        model = load_model(model_name)
        audio = load_audio(audio_path, model_name)

        # Convert times to sample indices
        sample_rate = 16000  # Whisper uses 16kHz audio
        start_sample = int(start_time * sample_rate)
        end_sample = int(end_time * sample_rate)

        # Ensure indices are within bounds
        audio_length = len(audio)
        start_sample = max(0, min(start_sample, audio_length - 1))
        end_sample = max(start_sample, min(end_sample, audio_length))

        # Extract the requested audio segment
        segment = audio[start_sample:end_sample]

        # If segment is too short, pad it
        if len(segment) < 0.5 * sample_rate:  # Less than 0.5 seconds
            logger.warning("Audio segment is very short, results may be poor")
            segment = whisper.pad_or_trim(segment, 0.5 * sample_rate)

        # Transcribe the segment
        result = model.transcribe(
            segment,
            language=_detected_language,
            initial_prompt=initial_prompt,
            verbose=False,
        )

        # Format time range for display
        start_formatted = str(datetime.timedelta(seconds=int(start_time)))
        end_formatted = str(datetime.timedelta(seconds=int(end_time)))

        # Extract and return the text
        transcribed_text = result["text"].strip()

        return {
            "text": transcribed_text,
            "start_time": start_time,
            "end_time": end_time,
            "time_range": f"{start_formatted} - {end_formatted}",
            "language": _detected_language,
            "message": "Successfully transcribed audio",
        }
    except Exception as e:
        return {
            "error": str(e),
            "message": f"Failed to transcribe audio: {e!s}",
        }

```

--------------------------------------------------------------------------------
/mcp_toolbox/flux/api.py:
--------------------------------------------------------------------------------

```python
import asyncio
import io
import os
from pathlib import Path

import httpx
from PIL import Image

API_URL = "https://api.bfl.ml"
API_ENDPOINTS = {
    "flux.1-pro": "flux-pro",
    "flux.1-dev": "flux-dev",
    "flux.1.1-pro": "flux-pro-1.1",
}


class ApiException(Exception):
    def __init__(self, status_code: int, detail: str | list[dict] | None = None):
        super().__init__()
        self.detail = detail
        self.status_code = status_code

    def __str__(self) -> str:
        return self.__repr__()

    def __repr__(self) -> str:
        if self.detail is None:
            message = None
        elif isinstance(self.detail, str):
            message = self.detail
        else:
            message = "[" + ",".join(d["msg"] for d in self.detail) + "]"
        return f"ApiException({self.status_code=}, {message=}, detail={self.detail})"


class ImageRequest:
    def __init__(
        self,
        # api inputs
        prompt: str,
        name: str = "flux.1.1-pro",
        width: int | None = None,
        height: int | None = None,
        num_steps: int | None = None,
        prompt_upsampling: bool | None = None,
        seed: int | None = None,
        guidance: float | None = None,
        interval: float | None = None,
        safety_tolerance: int | None = None,
        # behavior of this class
        validate: bool = True,
        api_key: str | None = None,
    ):
        """
        Manages an image generation request to the API.

        All parameters not specified will use the API defaults.

        Args:
            prompt: Text prompt for image generation.
            width: Width of the generated image in pixels. Must be a multiple of 32.
            height: Height of the generated image in pixels. Must be a multiple of 32.
            name: Which model version to use
            num_steps: Number of steps for the image generation process.
            prompt_upsampling: Whether to perform upsampling on the prompt.
            seed: Optional seed for reproducibility.
            guidance: Guidance scale for image generation.
            safety_tolerance: Tolerance level for input and output moderation.
                 Between 0 and 6, 0 being most strict, 6 being least strict.
            validate: Run input validation
            api_key: Your API key if not provided by the environment

        Raises:
            ValueError: For invalid input, when `validate`
            ApiException: For errors raised from the API
        """
        if validate:
            if name not in API_ENDPOINTS:
                raise ValueError(f"Invalid model {name}")
            elif width is not None and width % 32 != 0:
                raise ValueError(f"width must be divisible by 32, got {width}")
            elif width is not None and not (256 <= width <= 1440):
                raise ValueError(f"width must be between 256 and 1440, got {width}")
            elif height is not None and height % 32 != 0:
                raise ValueError(f"height must be divisible by 32, got {height}")
            elif height is not None and not (256 <= height <= 1440):
                raise ValueError(f"height must be between 256 and 1440, got {height}")
            elif num_steps is not None and not (1 <= num_steps <= 50):
                raise ValueError(f"steps must be between 1 and 50, got {num_steps}")
            elif guidance is not None and not (1.5 <= guidance <= 5.0):
                raise ValueError(f"guidance must be between 1.5 and 4, got {guidance}")
            elif interval is not None and not (1.0 <= interval <= 4.0):
                raise ValueError(f"interval must be between 1 and 4, got {interval}")
            elif safety_tolerance is not None and not (0 <= safety_tolerance <= 6.0):
                raise ValueError(f"safety_tolerance must be between 0 and 6, got {interval}")

            if name == "flux.1-dev":
                if interval is not None:
                    raise ValueError("Interval is not supported for flux.1-dev")
            if name == "flux.1.1-pro":
                if interval is not None or num_steps is not None or guidance is not None:
                    raise ValueError("Interval, num_steps and guidance are not supported for flux.1.1-pro")

        self.name = name
        self.request_json = {
            "prompt": prompt,
            "width": width,
            "height": height,
            "steps": num_steps,
            "prompt_upsampling": prompt_upsampling,
            "seed": seed,
            "guidance": guidance,
            "interval": interval,
            "safety_tolerance": safety_tolerance,
        }
        self.request_json = {key: value for key, value in self.request_json.items() if value is not None}

        self.request_id: str | None = None
        self.result: dict | None = None
        self._image_bytes: bytes | None = None
        self._url: str | None = None
        if api_key is None:
            self.api_key = os.environ.get("BFL_API_KEY")
        else:
            self.api_key = api_key

    async def request(self):
        """
        Request to generate the image.
        """
        if self.request_id is not None:
            return
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{API_URL}/v1/{API_ENDPOINTS[self.name]}",
                headers={
                    "accept": "application/json",
                    "x-key": self.api_key,
                    "Content-Type": "application/json",
                },
                json=self.request_json,
            )
            result = response.json()
            if response.status_code != 200:
                raise ApiException(status_code=response.status_code, detail=result.get("detail"))
            self.request_id = response.json()["id"]

    async def retrieve(self) -> dict:
        """
        Wait for the generation to finish and retrieve response.
        """
        if self.request_id is None:
            await self.request()
        while self.result is None:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"{API_URL}/v1/get_result",
                    headers={
                        "accept": "application/json",
                        "x-key": self.api_key,
                    },
                    params={
                        "id": self.request_id,
                    },
                )
                result = response.json()
                if "status" not in result:
                    raise ApiException(status_code=response.status_code, detail=result.get("detail"))
                elif result["status"] == "Ready":
                    self.result = result["result"]
                elif result["status"] == "Pending":
                    await asyncio.sleep(0.5)
                else:
                    raise ApiException(status_code=200, detail=f"API returned status '{result['status']}'")
        return self.result

    async def get_bytes(self) -> bytes:
        """
        Generated image as bytes.
        """
        if self._image_bytes is None:
            url = await self.get_url()
            async with httpx.AsyncClient() as client:
                response = await client.get(url)
                if response.status_code == 200:
                    self._image_bytes = response.content
                else:
                    raise ApiException(status_code=response.status_code)
        return self._image_bytes

    async def get_url(self) -> str:
        """
        Public url to retrieve the image from
        """
        if self._url is None:
            result = await self.retrieve()
            self._url = result["sample"]
        return self._url

    async def get_image(self) -> Image.Image:
        """
        Load the image as a PIL Image
        """
        bytes_data = await self.get_bytes()
        return Image.open(io.BytesIO(bytes_data))

    async def save(self, path: str) -> str:
        """
        Save the generated image to a local path

        Args:
            path: The path to save the image to

        Returns:
            The full path where the image was saved
        """
        url = await self.get_url()
        suffix = Path(url).suffix
        if not path.endswith(suffix):
            path = path + suffix
        Path(path).resolve().parent.mkdir(parents=True, exist_ok=True)
        bytes_data = await self.get_bytes()
        with open(path, "wb") as file:
            file.write(bytes_data)
        return path

```

--------------------------------------------------------------------------------
/tests/web/test_web_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for web tools."""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from httpx import HTTPStatusError, RequestError, Response

from mcp_toolbox.web.tools import (
    get_html,
    get_http_content,
    save_html,
)

# Check if optional dependencies are available
try:
    from mcp_toolbox.web.tools import search_with_tavily

    TAVILY_AVAILABLE = True
except ImportError:
    TAVILY_AVAILABLE = False

try:
    from mcp_toolbox.web.tools import search_with_duckduckgo

    DUCKDUCKGO_AVAILABLE = True
except ImportError:
    DUCKDUCKGO_AVAILABLE = False


# Mock HTML content for testing
MOCK_HTML_CONTENT = """
<!DOCTYPE html>
<html>
<head>
    <title>Test Page</title>
</head>
<body>
    <h1>Hello World</h1>
    <p>This is a test page.</p>
</body>
</html>
"""


# Helper function to create a mock response
def create_mock_response(status_code=200, content=MOCK_HTML_CONTENT):
    mock_response = MagicMock(spec=Response)
    mock_response.status_code = status_code
    mock_response.text = content
    mock_response.raise_for_status = MagicMock()
    if status_code >= 400:
        mock_response.raise_for_status.side_effect = HTTPStatusError(
            "HTTP Error", request=MagicMock(), response=mock_response
        )
    return mock_response


# Test get_http_content function
@pytest.mark.asyncio
async def test_get_http_content_success():
    """Test successful HTTP request."""
    # Create a mock response
    mock_response = create_mock_response()

    # Create a mock client
    mock_client = AsyncMock()
    mock_client.request.return_value = mock_response

    # Patch the client
    with patch("mcp_toolbox.web.tools.client", mock_client):
        # Call the function
        result = await get_http_content("https://example.com")

        # Verify the client was called with the correct arguments
        mock_client.request.assert_called_once_with(
            "GET",
            "https://example.com",
            headers=None,
            params=None,
            data=None,
            timeout=60,
        )

        # Verify the result is as expected
        assert result == MOCK_HTML_CONTENT


@pytest.mark.asyncio
async def test_get_http_content_error():
    """Test HTTP request with error."""
    # Create a mock response with error status
    mock_response = create_mock_response(status_code=404)

    # Create a mock client
    mock_client = AsyncMock()
    mock_client.request.return_value = mock_response

    # Patch the client and expect an exception
    with patch("mcp_toolbox.web.tools.client", mock_client), pytest.raises(HTTPStatusError):
        await get_http_content("https://example.com")


@pytest.mark.asyncio
async def test_get_http_content_request_error():
    """Test HTTP request with request error."""
    # Create a mock client that raises a RequestError
    mock_client = AsyncMock()
    mock_client.request.side_effect = RequestError("Connection error", request=MagicMock())

    # Patch the client and expect an exception
    with patch("mcp_toolbox.web.tools.client", mock_client), pytest.raises(RequestError):
        await get_http_content("https://example.com")


# Test save_html tool
@pytest.mark.asyncio
async def test_save_html_success():
    """Test successful saving of HTML."""
    # Mock get_http_content to return HTML content
    with (
        patch("mcp_toolbox.web.tools.get_http_content", return_value=MOCK_HTML_CONTENT),
        patch("pathlib.Path.write_text") as mock_write_text,
        patch("pathlib.Path.mkdir") as mock_mkdir,
    ):
        # Call the function
        result = await save_html("https://example.com", "/tmp/test.html")

        # Verify the result is as expected
        assert result["success"] is True
        assert result["url"] == "https://example.com"
        assert "/tmp/test.html" in result["output_path"]

        # Verify the file operations were called
        mock_mkdir.assert_called_once_with(parents=True, exist_ok=True)
        mock_write_text.assert_called_once_with(MOCK_HTML_CONTENT)


@pytest.mark.asyncio
async def test_save_html_network_error():
    """Test saving HTML with network error."""
    # Mock get_http_content to raise an exception
    with patch(
        "mcp_toolbox.web.tools.get_http_content",
        side_effect=Exception("Network error"),
    ):
        # Call the function
        result = await save_html("https://example.com", "/tmp/test.html")

        # Verify the result is as expected
        assert result["success"] is False
        assert "error" in result
        assert "Network error" in result["error"]


@pytest.mark.asyncio
async def test_save_html_write_error():
    """Test saving HTML with file write error."""
    # Mock get_http_content to return HTML content
    # Mock write_text to raise an exception
    with (
        patch("mcp_toolbox.web.tools.get_http_content", return_value=MOCK_HTML_CONTENT),
        patch("pathlib.Path.write_text", side_effect=Exception("Write error")),
        patch("pathlib.Path.mkdir"),
    ):
        # Call the function
        result = await save_html("https://example.com", "/tmp/test.html")

        # Verify the result is as expected
        assert result["success"] is False
        assert "error" in result
        assert "Write error" in result["error"]


# Test get_html tool
@pytest.mark.asyncio
async def test_get_html_success():
    """Test successful retrieval of HTML."""
    # Mock get_http_content to return HTML content
    with patch("mcp_toolbox.web.tools.get_http_content", return_value=MOCK_HTML_CONTENT):
        # Call the function
        result = await get_html("https://example.com")

        # Verify the result is as expected
        assert result["success"] is True
        assert result["url"] == "https://example.com"
        assert result["content"] == MOCK_HTML_CONTENT


@pytest.mark.asyncio
async def test_get_html_error():
    """Test retrieval of HTML with error."""
    # Mock get_http_content to raise an exception
    with patch(
        "mcp_toolbox.web.tools.get_http_content",
        side_effect=Exception("Network error"),
    ):
        # Call the function
        result = await get_html("https://example.com")

        # Verify the result is as expected
        assert result["success"] is False
        assert "error" in result
        assert "Network error" in result["error"]


# Test search_with_tavily tool if available
if TAVILY_AVAILABLE:

    @pytest.mark.asyncio
    async def test_search_with_tavily_success():
        """Test successful search with Tavily."""
        # Mock search results
        mock_results = [
            {"title": "Result 1", "url": "https://example.com/1", "content": "Content 1"},
            {"title": "Result 2", "url": "https://example.com/2", "content": "Content 2"},
        ]

        # Mock the Tavily client
        mock_client = AsyncMock()
        mock_client.search.return_value = {"results": mock_results}

        # Patch the AsyncTavilyClient
        with patch("mcp_toolbox.web.tools.AsyncTavilyClient", return_value=mock_client):
            # Call the function
            results = await search_with_tavily("test query")

            # Verify the client was called with the correct arguments
            mock_client.search.assert_called_once_with(
                "test query", search_depth="basic", topic="general", time_range=None
            )

            # Verify the results are as expected
            assert results == mock_results

    @pytest.mark.asyncio
    async def test_search_with_tavily_no_results():
        """Test search with Tavily with no results."""
        # Mock empty search results
        mock_results = {"results": []}

        # Mock the Tavily client
        mock_client = AsyncMock()
        mock_client.search.return_value = mock_results

        # Patch the AsyncTavilyClient
        with patch("mcp_toolbox.web.tools.AsyncTavilyClient", return_value=mock_client):
            # Call the function
            result = await search_with_tavily("test query")

            # Verify the result is as expected
            assert result["success"] is False
            assert "error" in result
            assert "No search results found" in result["error"]


# Test search_with_duckduckgo tool if available
if DUCKDUCKGO_AVAILABLE:

    @pytest.mark.asyncio
    async def test_search_with_duckduckgo_success():
        """Test successful search with DuckDuckGo."""
        # Mock search results
        mock_results = [
            {"title": "Result 1", "href": "https://example.com/1", "body": "Content 1"},
            {"title": "Result 2", "href": "https://example.com/2", "body": "Content 2"},
        ]

        # Mock the DDGS instance
        mock_ddgs = MagicMock()
        mock_ddgs.text.return_value = mock_results

        # Patch the DDGS class and anyio.to_thread.run_sync
        with (
            patch("mcp_toolbox.web.tools.DDGS", return_value=mock_ddgs),
            patch("mcp_toolbox.web.tools.anyio.to_thread.run_sync", return_value=mock_results),
        ):
            # Call the function
            results = await search_with_duckduckgo("test query")

            # Verify the results are as expected
            assert results == mock_results

    @pytest.mark.asyncio
    async def test_search_with_duckduckgo_no_results():
        """Test search with DuckDuckGo with no results."""
        # Mock empty search results
        mock_results = []

        # Mock the DDGS instance
        mock_ddgs = MagicMock()
        mock_ddgs.text.return_value = mock_results

        # Patch the DDGS class and anyio.to_thread.run_sync
        with (
            patch("mcp_toolbox.web.tools.DDGS", return_value=mock_ddgs),
            patch("mcp_toolbox.web.tools.anyio.to_thread.run_sync", return_value=mock_results),
        ):
            # Call the function
            result = await search_with_duckduckgo("test query")

            # Verify the result is as expected
            assert result["success"] is False
            assert "error" in result
            assert "No search results found" in result["error"]

```

--------------------------------------------------------------------------------
/llms.txt:
--------------------------------------------------------------------------------

```
# MCP-Toolbox Development Guide for LLMs

This guide is designed to help you (an LLM) effectively contribute to the mcp-toolbox project. It provides essential information about the project structure, development workflow, and best practices.

## Project Overview

MCP-Toolbox is a Python package that provides tools for enhancing LLMs through the Model Context Protocol (MCP). The project implements various API integrations as MCP tools, allowing LLMs to interact with external services.

### Key Components

- **mcp_toolbox/app.py**: Initializes the FastMCP server
- **mcp_toolbox/cli.py**: Command-line interface for running the MCP server
- **mcp_toolbox/config.py**: Configuration management using Pydantic
- **mcp_toolbox/figma/**: Figma API integration tools
- **tests/**: Test files for the project

## Environment Setup

Always help the user set up a proper development environment using `uv`. This is the preferred package manager for this project.

### Setting Up the Environment

```bash
# Install uv if not already installed
curl -LsSf https://astral.sh/uv/install.sh | sh  # For macOS/Linux
# or
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"  # For Windows

# Clone the repository (if not already done)
git clone https://github.com/username/mcp-toolbox.git
cd mcp-toolbox

# Create and activate a virtual environment
uv venv
source .venv/bin/activate  # For macOS/Linux
# or
.venv\Scripts\activate  # For Windows

# Install the package in development mode
uv pip install -e .

# Install development dependencies
uv pip install -e ".[dev]"
```

## GitHub Workflow

Always follow proper GitHub workflow when making changes:

1. **Create a branch with a descriptive name**:
   ```bash
   # Assume the user already has their own fork
   git checkout -b feature/add-spotify-integration
   ```

2. **Make your changes**: Implement the requested features or fixes

3. **Run tests and checks**:
   ```bash
   make check  # Run linting and formatting
   make test   # Run tests
   ```

4. **Commit your changes with descriptive messages**:
   ```bash
   git add .
   git commit -m "feat: add Spotify API integration"
   ```

5. **Push your changes**:
   ```bash
   git push origin feature/add-spotify-integration
   ```

6. **Create a pull request**: Guide the user to create a PR from their branch to the main repository

## Adding New Tools

When adding new API integrations or tools, follow this pattern. Here's an example of adding Spotify API integration:

### 1. Update Config Class

First, update the `config.py` file to include the new API key:

```python
class Config(BaseSettings):
    figma_api_key: str | None = None
    spotify_client_id: str | None = None
    spotify_client_secret: str | None = None

    cache_dir: str = (HOME / "cache").expanduser().resolve().absolute().as_posix()
```

### 2. Create Module Structure

Create a new module for the integration:

```bash
mkdir -p mcp_toolbox/spotify
touch mcp_toolbox/spotify/__init__.py
touch mcp_toolbox/spotify/tools.py
```

### 3. Implement API Client and Tools

In `mcp_toolbox/spotify/tools.py`:

```python
import json
from typing import Any, List, Dict, Optional

import httpx
from pydantic import BaseModel

from mcp_toolbox.app import mcp
from mcp_toolbox.config import Config


class SpotifyApiClient:
    BASE_URL = "https://api.spotify.com/v1"

    def __init__(self):
        self.config = Config()
        self.access_token = None

    async def get_access_token(self) -> str:
        """Get or refresh the Spotify access token."""
        if not self.config.spotify_client_id or not self.config.spotify_client_secret:
            raise ValueError(
                "Spotify credentials not provided. Set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables."
            )

        auth_url = "https://accounts.spotify.com/api/token"

        async with httpx.AsyncClient() as client:
            response = await client.post(
                auth_url,
                data={"grant_type": "client_credentials"},
                auth=(self.config.spotify_client_id, self.config.spotify_client_secret),
            )
            response.raise_for_status()
            data = response.json()
            self.access_token = data["access_token"]
            return self.access_token

    async def make_request(self, path: str, method: str = "GET", params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Make a request to the Spotify API."""
        token = await self.get_access_token()

        async with httpx.AsyncClient() as client:
            headers = {"Authorization": f"Bearer {token}"}
            url = f"{self.BASE_URL}{path}"

            try:
                if method == "GET":
                    response = await client.get(url, headers=headers, params=params)
                else:
                    raise ValueError(f"Unsupported HTTP method: {method}")

                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as e:
                spotify_error = e.response.json() if e.response.content else {"status": e.response.status_code, "error": str(e)}
                raise ValueError(f"Spotify API error: {spotify_error}") from e
            except httpx.RequestError as e:
                raise ValueError(f"Request error: {e!s}") from e


# Initialize API client
api_client = SpotifyApiClient()


# Tool implementations
@mcp.tool(
    description="Search for tracks on Spotify. Args: query (required, The search query), limit (optional, Maximum number of results to return)"
)
async def spotify_search_tracks(query: str, limit: int = 10) -> Dict[str, Any]:
    """Search for tracks on Spotify.

    Args:
        query: The search query
        limit: Maximum number of results to return (default: 10)
    """
    params = {"q": query, "type": "track", "limit": limit}
    return await api_client.make_request("/search", params=params)


@mcp.tool(
    description="Get details about a specific track. Args: track_id (required, The Spotify ID of the track)"
)
async def spotify_get_track(track_id: str) -> Dict[str, Any]:
    """Get details about a specific track.

    Args:
        track_id: The Spotify ID of the track
    """
    return await api_client.make_request(f"/tracks/{track_id}")


@mcp.tool(
    description="Get an artist's top tracks. Args: artist_id (required, The Spotify ID of the artist), market (optional, An ISO 3166-1 alpha-2 country code)"
)
async def spotify_get_artist_top_tracks(artist_id: str, market: str = "US") -> Dict[str, Any]:
    """Get an artist's top tracks.

    Args:
        artist_id: The Spotify ID of the artist
        market: An ISO 3166-1 alpha-2 country code (default: US)
    """
    params = {"market": market}
    return await api_client.make_request(f"/artists/{artist_id}/top-tracks", params=params)
```

### 4. Create Tests

Create test files for your new tools:

```bash
mkdir -p tests/spotify
touch tests/spotify/test_tools.py
mkdir -p tests/mock/spotify
```

### 5. Update README

Always update the README.md when adding new environment variables or tools:

```markdown
## Environment Variables

The following environment variables can be configured:

- `FIGMA_API_KEY`: API key for Figma integration
- `SPOTIFY_CLIENT_ID`: Client ID for Spotify API
- `SPOTIFY_CLIENT_SECRET`: Client Secret for Spotify API
```

## Error Handling Best Practices

When implementing tools, follow these error handling best practices:

1. **Graceful Degradation**: If one API key is missing, other tools should still work
   ```python
   async def get_access_token(self) -> str:
       if not self.config.spotify_client_id or not self.config.spotify_client_secret:
           raise ValueError(
               "Spotify credentials not provided. Set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables."
           )
       # Rest of the method...
   ```

2. **Descriptive Error Messages**: Provide clear error messages that help users understand what went wrong
   ```python
   except httpx.HTTPStatusError as e:
       spotify_error = e.response.json() if e.response.content else {"status": e.response.status_code, "error": str(e)}
       raise ValueError(f"Spotify API error: {spotify_error}") from e
   ```

3. **Proper Exception Handling**: Catch specific exceptions and handle them appropriately

4. **Fallbacks**: Implement fallback mechanisms when possible

## Testing

Always write tests for new functionality:

```python
import json
from pathlib import Path
from unittest.mock import patch

import pytest

from mcp_toolbox.spotify.tools import (
    SpotifyApiClient,
    spotify_search_tracks,
    spotify_get_track,
    spotify_get_artist_top_tracks,
)


# Helper function to load mock data
def load_mock_data(filename):
    mock_dir = Path(__file__).parent.parent / "mock" / "spotify"
    file_path = mock_dir / filename

    if not file_path.exists():
        # Create empty mock data if it doesn't exist
        mock_data = {"mock": "data"}
        with open(file_path, "w") as f:
            json.dump(mock_data, f)

    with open(file_path) as f:
        return json.load(f)


# Patch the SpotifyApiClient.make_request method
@pytest.fixture
def mock_make_request():
    with patch.object(SpotifyApiClient, "make_request") as mock:
        def side_effect(path, method="GET", params=None):
            if path == "/search":
                return load_mock_data("search_tracks.json")
            elif path.startswith("/tracks/"):
                return load_mock_data("get_track.json")
            elif path.endswith("/top-tracks"):
                return load_mock_data("get_artist_top_tracks.json")
            return {"mock": "data"}

        mock.side_effect = side_effect
        yield mock


# Test spotify_search_tracks function
@pytest.mark.asyncio
async def test_spotify_search_tracks(mock_make_request):
    result = await spotify_search_tracks("test query")
    mock_make_request.assert_called_once()
    assert mock_make_request.call_args[0][0] == "/search"


# Test spotify_get_track function
@pytest.mark.asyncio
async def test_spotify_get_track(mock_make_request):
    result = await spotify_get_track("track_id")
    mock_make_request.assert_called_once()
    assert mock_make_request.call_args[0][0] == "/tracks/track_id"


# Test spotify_get_artist_top_tracks function
@pytest.mark.asyncio
async def test_spotify_get_artist_top_tracks(mock_make_request):
    result = await spotify_get_artist_top_tracks("artist_id")
    mock_make_request.assert_called_once()
    assert mock_make_request.call_args[0][0] == "/artists/artist_id/top-tracks"
```

## Documentation

When adding new tools, make sure to:

1. Add clear docstrings to all functions and classes
2. Include detailed argument descriptions in the `@mcp.tool` decorator
3. Add type hints to all functions and methods
4. Update the README.md with new environment variables and tools

## Final Checklist

Before submitting your changes:

1. ✅ Run `make check` to ensure code quality
2. ✅ Run `make test` to ensure all tests pass
3. ✅ Update documentation if needed
4. ✅ Add new environment variables to README.md
5. ✅ Follow proper Git workflow (branch, commit, push)

```

--------------------------------------------------------------------------------
/mcp_toolbox/figma/tools.py:
--------------------------------------------------------------------------------

```python
import json
import time
from pathlib import Path
from typing import Annotated, Any

import httpx
from pydantic import BaseModel, Field

from mcp_toolbox.app import mcp
from mcp_toolbox.config import Config


# Type definitions for request/response parameters
class ClientMeta(BaseModel):
    x: float
    y: float
    node_id: str | None = None
    node_offset: dict[str, float] | None = None


# API Client
class FigmaApiClient:
    BASE_URL = "https://api.figma.com/v1"

    def __init__(self):
        self.config = Config()

    async def get_access_token(self) -> str:
        if not self.config.figma_api_key:
            raise ValueError("No Figma API key provided. Set the FIGMA_API_KEY environment variable.")
        return self.config.figma_api_key

    async def make_request(self, path: str, method: str = "GET", data: Any = None) -> dict[str, Any]:
        token = await self.get_access_token()

        async with httpx.AsyncClient(
            transport=httpx.AsyncHTTPTransport(retries=3),
            timeout=30,
        ) as client:
            headers = {"X-Figma-Token": token}
            url = f"{self.BASE_URL}{path}"

            try:
                if method == "GET":
                    response = await client.get(url, headers=headers)
                elif method == "POST":
                    response = await client.post(url, headers=headers, json=data)
                elif method == "DELETE":
                    response = await client.delete(url, headers=headers)
                else:
                    raise ValueError(f"Unsupported HTTP method: {method}")

                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as e:
                figma_error = (
                    e.response.json() if e.response.content else {"status": e.response.status_code, "err": str(e)}
                )
                raise ValueError(
                    f"Figma API error: {figma_error.get('err', figma_error.get('message', str(e)))}"
                ) from e
            except httpx.RequestError as e:
                raise ValueError(f"Request error: {e!s}") from e

    def build_query_string(self, params: dict[str, Any]) -> str:
        # Filter out None values
        filtered_params = {k: v for k, v in params.items() if v is not None}

        if not filtered_params:
            return ""

        # Convert lists to comma-separated strings
        for key, value in filtered_params.items():
            if isinstance(value, list):
                filtered_params[key] = ",".join(map(str, value))

        # Build query string
        query_parts = [f"{k}={v}" for k, v in filtered_params.items()]
        return "?" + "&".join(query_parts)


# Cache Manager
class CacheManager:
    def __init__(self):
        self.config = Config()
        self.cache_dir = Path(self.config.cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def save_to_cache(self, filename: str, data: Any) -> str:
        file_path = self.cache_dir / filename
        with open(file_path, "w") as f:
            json.dump(data, f, indent=2)
        return str(file_path)


# Initialize API client and cache manager
api_client = FigmaApiClient()
cache_manager = CacheManager()


# Tool implementations
@mcp.tool(description="Get a Figma file by key")
async def figma_get_file(
    file_key: Annotated[str, Field(description="The key of the file to get")],
    version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None,
    depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None,
    branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None,
) -> dict[str, Any]:
    """Get a Figma file by key."""
    params = {"version": version, "depth": depth, "branch_data": branch_data}

    query_string = api_client.build_query_string(params)
    result = await api_client.make_request(f"/files/{file_key}{query_string}")

    # Save to cache
    try:
        filename = f"file_{file_key}_{int(time.time() * 1000)}.json"
        file_path = cache_manager.save_to_cache(filename, result)
        return {
            "file_path": file_path,
            "message": "File data saved to local cache. Use this file path to access the complete data.",
        }
    except Exception:
        # If saving to cache fails, return original result
        return result


@mcp.tool(description="Get specific nodes from a Figma file.")
async def figma_get_file_nodes(
    file_key: Annotated[str, Field(description="The key of the file to get nodes from")],
    node_ids: Annotated[list[str], Field(description="Array of node IDs to get")],
    depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None,
    version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None,
) -> dict[str, Any]:
    """Get specific nodes from a Figma file."""
    params = {"ids": node_ids, "depth": depth, "version": version}

    query_string = api_client.build_query_string(params)
    result = await api_client.make_request(f"/files/{file_key}/nodes{query_string}")

    # Save to cache
    try:
        filename = f"file_nodes_{file_key}_{int(time.time() * 1000)}.json"
        file_path = cache_manager.save_to_cache(filename, result)
        return {
            "file_path": file_path,
            "message": "File nodes data saved to local cache. Use this file path to access the complete data.",
        }
    except Exception:
        # If saving to cache fails, return original result
        return result


@mcp.tool(description="Get images for nodes in a Figma file.")
async def figma_get_image(
    file_key: Annotated[str, Field(description="The key of the file to get images from")],
    ids: Annotated[list[str], Field(description="Array of node IDs to render")],
    scale: Annotated[float | None, Field(default=None, description="Scale factor to render at 0.01-4")] = None,
    format_type: Annotated[str | None, Field(default=None, description="Image format jpg/png/svg/pdf")] = None,
    svg_include_id: Annotated[bool | None, Field(default=None, description="Include IDs in SVG output")] = None,
    svg_simplify_stroke: Annotated[
        bool | None, Field(default=None, description="Simplify strokes in SVG output")
    ] = None,
    use_absolute_bounds: Annotated[bool | None, Field(default=None, description="Use absolute bounds")] = None,
) -> dict[str, Any]:
    """Get images for nodes in a Figma file."""
    params = {
        "ids": ids,
        "scale": scale,
        "format": format_type,
        "svg_include_id": svg_include_id,
        "svg_simplify_stroke": svg_simplify_stroke,
        "use_absolute_bounds": use_absolute_bounds,
    }

    query_string = api_client.build_query_string(params)
    return await api_client.make_request(f"/images/{file_key}{query_string}")


@mcp.tool(description="Get URLs for images used in a Figma file.")
async def figma_get_image_fills(
    file_key: Annotated[str, Field(description="The key of the file to get image fills from")],
) -> dict[str, Any]:
    """Get URLs for images used in a Figma file."""
    return await api_client.make_request(f"/files/{file_key}/images")


@mcp.tool(description="Get comments on a Figma file.")
async def figma_get_comments(
    file_key: Annotated[str, Field(description="The key of the file to get comments from")],
) -> dict[str, Any]:
    """Get comments on a Figma file."""
    return await api_client.make_request(f"/files/{file_key}/comments")


@mcp.tool(description="Post a comment on a Figma file.")
async def figma_post_comment(
    file_key: Annotated[str, Field(description="The key of the file to comment on")],
    message: Annotated[str, Field(description="Comment message text")],
    client_meta: Annotated[
        dict[str, Any] | None, Field(default=None, description="Position of the comment x/y/node_id/node_offset")
    ] = None,
    comment_id: Annotated[str | None, Field(default=None, description="ID of comment to reply to")] = None,
) -> dict[str, Any]:
    """Post a comment on a Figma file."""
    comment_data = {"message": message}

    if client_meta:
        comment_data["client_meta"] = client_meta

    if comment_id:
        comment_data["comment_id"] = comment_id

    return await api_client.make_request(f"/files/{file_key}/comments", "POST", comment_data)


@mcp.tool(description="Delete a comment from a Figma file.")
async def figma_delete_comment(
    file_key: Annotated[str, Field(description="The key of the file to delete a comment from")],
    comment_id: Annotated[str, Field(description="ID of the comment to delete")],
) -> dict[str, Any]:
    """Delete a comment from a Figma file."""
    return await api_client.make_request(f"/files/{file_key}/comments/{comment_id}", "DELETE")


@mcp.tool(description="Get projects for a team.")
async def figma_get_team_projects(
    team_id: Annotated[str, Field(description="The team ID")],
    page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
    cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
) -> dict[str, Any]:
    """Get projects for a team."""
    params = {"page_size": page_size, "cursor": cursor}

    query_string = api_client.build_query_string(params)
    return await api_client.make_request(f"/teams/{team_id}/projects{query_string}")


@mcp.tool(description="Get files for a project.")
async def figma_get_project_files(
    project_id: Annotated[str, Field(description="The project ID")],
    page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
    cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
    branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None,
) -> dict[str, Any]:
    """Get files for a project."""
    params = {"page_size": page_size, "cursor": cursor, "branch_data": branch_data}

    query_string = api_client.build_query_string(params)
    return await api_client.make_request(f"/projects/{project_id}/files{query_string}")


@mcp.tool(description="Get components for a team.")
async def figma_get_team_components(
    team_id: Annotated[str, Field(description="The team ID")],
    page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
    cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
) -> dict[str, Any]:
    """Get components for a team."""
    params = {"page_size": page_size, "cursor": cursor}

    query_string = api_client.build_query_string(params)
    return await api_client.make_request(f"/teams/{team_id}/components{query_string}")


@mcp.tool(description="Get components from a file.")
async def figma_get_file_components(
    file_key: Annotated[str, Field(description="The key of the file to get components from")],
) -> dict[str, Any]:
    """Get components from a file."""
    return await api_client.make_request(f"/files/{file_key}/components")


@mcp.tool(description="Get a component by key.")
async def figma_get_component(key: Annotated[str, Field(description="The component key")]) -> dict[str, Any]:
    """Get a component by key."""
    return await api_client.make_request(f"/components/{key}")


@mcp.tool(description="Get component sets for a team.")
async def figma_get_team_component_sets(
    team_id: Annotated[str, Field(description="The team ID")],
    page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
    cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
) -> dict[str, Any]:
    """Get component sets for a team."""
    params = {"page_size": page_size, "cursor": cursor}

    query_string = api_client.build_query_string(params)
    return await api_client.make_request(f"/teams/{team_id}/component_sets{query_string}")


@mcp.tool(description="Get styles for a team.")
async def figma_get_team_styles(
    team_id: Annotated[str, Field(description="The team ID")],
    page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
    cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
) -> dict[str, Any]:
    """Get styles for a team."""
    params = {"page_size": page_size, "cursor": cursor}

    query_string = api_client.build_query_string(params)
    return await api_client.make_request(f"/teams/{team_id}/styles{query_string}")


@mcp.tool(description="Get styles from a file.")
async def figma_get_file_styles(
    file_key: Annotated[str, Field(description="The key of the file to get styles from")],
) -> dict[str, Any]:
    """Get styles from a file."""
    return await api_client.make_request(f"/files/{file_key}/styles")


@mcp.tool(description="Get a style by key.")
async def figma_get_style(key: Annotated[str, Field(description="The style key")]) -> dict[str, Any]:
    """Get a style by key."""
    return await api_client.make_request(f"/styles/{key}")

```

--------------------------------------------------------------------------------
/tests/figma/test_figma_tools.py:
--------------------------------------------------------------------------------

```python
import json
from pathlib import Path
from unittest.mock import patch

import pytest

from mcp_toolbox.figma.tools import (
    CacheManager,
    FigmaApiClient,
    figma_delete_comment,
    figma_get_comments,
    figma_get_component,
    figma_get_file,
    figma_get_file_components,
    figma_get_file_nodes,
    figma_get_file_styles,
    figma_get_image,
    figma_get_image_fills,
    figma_get_project_files,
    figma_get_style,
    figma_get_team_component_sets,
    figma_get_team_components,
    figma_get_team_projects,
    figma_get_team_styles,
    figma_post_comment,
)


# Helper function to load mock data
def load_mock_data(filename):
    mock_dir = Path(__file__).parent.parent / "mock" / "figma"
    file_path = mock_dir / filename

    if not file_path.exists():
        # Create empty mock data if it doesn't exist
        mock_data = {"mock": "data"}
        with open(file_path, "w") as f:
            json.dump(mock_data, f)

    with open(file_path) as f:
        return json.load(f)


# Patch the FigmaApiClient.make_request method
@pytest.fixture
def mock_make_request():
    with patch.object(FigmaApiClient, "make_request") as mock:

        def side_effect(path, method="GET", data=None):
            # Extract the tool name from the path
            parts = path.strip("/").split("/")

            if len(parts) >= 2 and parts[0] == "files" and parts[1]:
                file_key = parts[1]

                if len(parts) == 2:
                    # get_file
                    return load_mock_data("get_file.json")
                elif len(parts) == 3:
                    if parts[2] == "nodes":
                        # get_file_nodes
                        return load_mock_data("get_file_nodes.json")
                    elif parts[2] == "images":
                        # get_image_fills
                        return load_mock_data("get_image_fills.json")
                    elif parts[2] == "components":
                        # get_file_components
                        return load_mock_data("get_file_components.json")
                    elif parts[2] == "styles":
                        # get_file_styles
                        return load_mock_data("get_file_styles.json")
                    elif parts[2] == "comments":
                        if method == "GET":
                            # get_comments
                            return load_mock_data("get_comments.json")
                        elif method == "POST":
                            # post_comment
                            return load_mock_data("post_comment.json")
                elif len(parts) == 4 and parts[2] == "comments":
                    # delete_comment
                    return load_mock_data("delete_comment.json")

            elif parts[0] == "images" and len(parts) >= 2:
                # get_image
                return load_mock_data("get_image.json")

            elif parts[0] == "teams" and len(parts) >= 3:
                team_id = parts[1]

                if parts[2] == "projects":
                    # get_team_projects
                    return load_mock_data("get_team_projects.json")
                elif parts[2] == "components":
                    # get_team_components
                    return load_mock_data("get_team_components.json")
                elif parts[2] == "component_sets":
                    # get_team_component_sets
                    return load_mock_data("get_team_component_sets.json")
                elif parts[2] == "styles":
                    # get_team_styles
                    return load_mock_data("get_team_styles.json")

            elif parts[0] == "projects" and len(parts) >= 3:
                # get_project_files
                return load_mock_data("get_project_files.json")

            elif parts[0] == "components" and len(parts) >= 2:
                # get_component
                return load_mock_data("get_component.json")

            elif parts[0] == "styles" and len(parts) >= 2:
                # get_style
                return load_mock_data("get_style.json")

            # Default mock data
            return {"mock": "data"}

        mock.side_effect = side_effect
        yield mock


# Patch the CacheManager.save_to_cache method
@pytest.fixture
def mock_save_to_cache():
    with patch.object(CacheManager, "save_to_cache") as mock:
        mock.return_value = "/mock/path/to/cache/file.json"
        yield mock


# Test get_file function
@pytest.mark.asyncio
async def test_get_file(mock_make_request, mock_save_to_cache):
    # Test with minimal parameters
    result = await figma_get_file("test_file_key")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key")

    # Verify save_to_cache was called
    mock_save_to_cache.assert_called_once()

    # Verify the result contains expected fields
    assert "file_path" in result
    assert "message" in result
    assert result["file_path"] == "/mock/path/to/cache/file.json"

    # Reset mocks for next test
    mock_make_request.reset_mock()
    mock_save_to_cache.reset_mock()

    # Test with all parameters
    result = await figma_get_file("test_file_key", version="123", depth=2, branch_data=True)

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key?version=123&depth=2&branch_data=True")


# Test get_file_nodes function
@pytest.mark.asyncio
async def test_get_file_nodes(mock_make_request, mock_save_to_cache):
    # Test with minimal parameters
    result = await figma_get_file_nodes("test_file_key", ["node1", "node2"])

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2")

    # Verify save_to_cache was called
    mock_save_to_cache.assert_called_once()

    # Verify the result contains expected fields
    assert "file_path" in result
    assert "message" in result
    assert result["file_path"] == "/mock/path/to/cache/file.json"

    # Reset mocks for next test
    mock_make_request.reset_mock()
    mock_save_to_cache.reset_mock()

    # Test with all parameters
    result = await figma_get_file_nodes("test_file_key", ["node1", "node2"], depth=2, version="123")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2&depth=2&version=123")


# Test get_image function
@pytest.mark.asyncio
async def test_get_image(mock_make_request):
    # Test with minimal parameters
    result = await figma_get_image("test_file_key", ["node1", "node2"])

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/images/test_file_key?ids=node1,node2")

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    result = await figma_get_image(
        "test_file_key",
        ["node1", "node2"],
        scale=2.0,
        format_type="png",
        svg_include_id=True,
        svg_simplify_stroke=True,
        use_absolute_bounds=True,
    )

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with(
        "/images/test_file_key?ids=node1,node2&scale=2.0&format=png&svg_include_id=True&svg_simplify_stroke=True&use_absolute_bounds=True"
    )


# Test get_image_fills function
@pytest.mark.asyncio
async def test_get_image_fills(mock_make_request):
    result = await figma_get_image_fills("test_file_key")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/images")


# Test get_comments function
@pytest.mark.asyncio
async def test_get_comments(mock_make_request):
    result = await figma_get_comments("test_file_key")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/comments")


# Test post_comment function
@pytest.mark.asyncio
async def test_post_comment(mock_make_request):
    # Test with minimal parameters
    result = await figma_post_comment("test_file_key", "Test comment")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/comments", "POST", {"message": "Test comment"})

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    client_meta = {"x": 100, "y": 200, "node_id": "node1", "node_offset": {"x": 10, "y": 20}}

    result = await figma_post_comment("test_file_key", "Test comment", client_meta=client_meta, comment_id="comment1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with(
        "/files/test_file_key/comments",
        "POST",
        {"message": "Test comment", "client_meta": client_meta, "comment_id": "comment1"},
    )


# Test delete_comment function
@pytest.mark.asyncio
async def test_delete_comment(mock_make_request):
    result = await figma_delete_comment("test_file_key", "comment1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/comments/comment1", "DELETE")


# Test get_team_projects function
@pytest.mark.asyncio
async def test_get_team_projects(mock_make_request):
    # Test with minimal parameters
    result = await figma_get_team_projects("team1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/projects")

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    result = await figma_get_team_projects("team1", page_size=10, cursor="cursor1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/projects?page_size=10&cursor=cursor1")


# Test get_project_files function
@pytest.mark.asyncio
async def test_get_project_files(mock_make_request):
    # Test with minimal parameters
    result = await figma_get_project_files("project1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/projects/project1/files")

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    result = await figma_get_project_files("project1", page_size=10, cursor="cursor1", branch_data=True)

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/projects/project1/files?page_size=10&cursor=cursor1&branch_data=True")


# Test get_team_components function
@pytest.mark.asyncio
async def test_get_team_components(mock_make_request):
    # Test with minimal parameters
    result = await figma_get_team_components("team1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/components")

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    result = await figma_get_team_components("team1", page_size=10, cursor="cursor1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/components?page_size=10&cursor=cursor1")


# Test get_file_components function
@pytest.mark.asyncio
async def test_get_file_components(mock_make_request):
    result = await figma_get_file_components("test_file_key")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/components")


# Test get_component function
@pytest.mark.asyncio
async def test_get_component(mock_make_request):
    result = await figma_get_component("component1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/components/component1")


# Test get_team_component_sets function
@pytest.mark.asyncio
async def test_get_team_component_sets(mock_make_request):
    # Test with minimal parameters
    result = await figma_get_team_component_sets("team1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/component_sets")

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    result = await figma_get_team_component_sets("team1", page_size=10, cursor="cursor1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/component_sets?page_size=10&cursor=cursor1")


# Test get_team_styles function
@pytest.mark.asyncio
async def test_get_team_styles(mock_make_request):
    # Test with minimal parameters
    result = await figma_get_team_styles("team1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/styles")

    # Reset mock for next test
    mock_make_request.reset_mock()

    # Test with all parameters
    result = await figma_get_team_styles("team1", page_size=10, cursor="cursor1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/teams/team1/styles?page_size=10&cursor=cursor1")


# Test get_file_styles function
@pytest.mark.asyncio
async def test_get_file_styles(mock_make_request):
    result = await figma_get_file_styles("test_file_key")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/files/test_file_key/styles")


# Test get_style function
@pytest.mark.asyncio
async def test_get_style(mock_make_request):
    result = await figma_get_style("style1")

    # Verify make_request was called with correct parameters
    mock_make_request.assert_called_once_with("/styles/style1")

```

--------------------------------------------------------------------------------
/mcp_toolbox/file_ops/tools.py:
--------------------------------------------------------------------------------

```python
"""File operations tools for MCP-Toolbox."""

import re
import stat
from datetime import datetime
from pathlib import Path
from typing import Annotated, Any

from pydantic import Field

from mcp_toolbox.app import mcp


@mcp.tool(description="Read file content.")
async def read_file_content(
    path: Annotated[str, Field(description="Path to the file to read")],
    encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8",
    chunk_size: Annotated[
        int,
        Field(default=1000000, description="Size of each chunk in bytes, default: 1MB"),
    ] = 1000000,
    chunk_index: Annotated[int, Field(default=0, description="Index of the chunk to retrieve, 0-based")] = 0,
) -> dict[str, Any]:
    """Read content from a file, with support for chunked reading for large files.

    Args:
        path: Path to the file to read
        encoding: Optional. File encoding (default: utf-8)
        chunk_size: Optional. Size of each chunk in bytes (default: 1000000, which about 1MB)
        chunk_index: Optional. Index of the chunk to retrieve, 0-based (default: 0)

    Returns:
        Dictionary containing content and metadata, including chunking information
    """
    try:
        file_path = Path(path).expanduser()

        if not file_path.exists():
            return {
                "error": f"File not found: {path}",
                "content": "",
                "success": False,
            }

        if not file_path.is_file():
            return {
                "error": f"Path is not a file: {path}",
                "content": "",
                "success": False,
            }

        # Get file stats
        stats = file_path.stat()
        file_size = stats.st_size

        # Calculate total chunks
        total_chunks = (file_size + chunk_size - 1) // chunk_size  # Ceiling division

        # Validate chunk_index
        if chunk_index < 0 or (file_size > 0 and chunk_index >= total_chunks):
            return {
                "error": f"Invalid chunk index: {chunk_index}. Valid range is 0 to {total_chunks - 1}",
                "content": "",
                "success": False,
                "total_chunks": total_chunks,
                "file_size": file_size,
            }

        # Calculate start and end positions for the chunk
        start_pos = chunk_index * chunk_size
        end_pos = min(start_pos + chunk_size, file_size)
        chunk_actual_size = end_pos - start_pos

        # Read the specified chunk
        content = ""
        with open(file_path, "rb") as f:
            f.seek(start_pos)
            chunk_bytes = f.read(chunk_actual_size)

            try:
                # Try to decode as text
                content = chunk_bytes.decode(encoding, errors="replace")
            except UnicodeDecodeError:
                # If decoding fails, return base64 encoded binary data
                import base64

                content = base64.b64encode(chunk_bytes).decode("ascii")
                encoding = f"base64 (original: {encoding})"

        return {
            "content": content,
            "size": file_size,
            "chunk_size": chunk_size,
            "chunk_index": chunk_index,
            "chunk_actual_size": chunk_actual_size,
            "total_chunks": total_chunks,
            "is_last_chunk": chunk_index == total_chunks - 1,
            "encoding": encoding,
            "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
            "success": True,
        }
    except UnicodeDecodeError:
        return {
            "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.",
            "content": "",
            "success": False,
        }
    except Exception as e:
        return {
            "error": f"Failed to read file: {e!s}",
            "content": "",
            "success": False,
        }


@mcp.tool(description="Write content to a file.")
async def write_file_content(
    path: Annotated[str, Field(description="Path to the file to write")],
    content: Annotated[str, Field(description="Content to write")],
    encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8",
    append: Annotated[bool, Field(default=False, description="Whether to append to the file")] = False,
) -> dict[str, Any]:
    """Write content to a file.

    Args:
        path: Path to the file to write
        content: Content to write to the file
        encoding: Optional. File encoding (default: utf-8)
        append: Optional. Whether to append to the file (default: False)

    Returns:
        Dictionary containing success status and metadata
    """
    try:
        file_path = Path(path).expanduser()

        # Create parent directories if they don't exist
        file_path.parent.mkdir(parents=True, exist_ok=True)

        # Write content to file
        mode = "a" if append else "w"
        with open(file_path, mode, encoding=encoding) as f:
            f.write(content)

        # Get file stats
        stats = file_path.stat()

        return {
            "path": str(file_path),
            "size": stats.st_size,
            "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
            "success": True,
        }
    except Exception as e:
        return {
            "error": f"Failed to write file: {e!s}",
            "path": path,
            "success": False,
        }


@mcp.tool(description="Replace content in a file using regular expressions.")
async def replace_in_file(
    path: Annotated[str, Field(description="Path to the file")],
    pattern: Annotated[
        str,
        Field(
            description="Python regular expression pattern (re module). Supports groups, character classes, quantifiers, etc. Examples: '[a-z]+' for lowercase words, '\\d{3}-\\d{4}' for number patterns. Remember to escape backslashes."
        ),
    ],
    replacement: Annotated[str, Field(description="Replacement string")],
    encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8",
    count: Annotated[int, Field(default=0, description="Maximum number of replacements")] = 0,
) -> dict[str, Any]:
    """Replace content in a file using regular expressions.

    Args:
        path: Path to the file
        pattern: Regular expression pattern
        replacement: Replacement string
        encoding: Optional. File encoding (default: utf-8)
        count: Optional. Maximum number of replacements (default: 0, which means all occurrences)

    Returns:
        Dictionary containing success status and replacement information
    """
    try:
        file_path = Path(path).expanduser()

        if not file_path.exists():
            return {
                "error": f"File not found: {path}",
                "success": False,
                "replacements": 0,
            }

        if not file_path.is_file():
            return {
                "error": f"Path is not a file: {path}",
                "success": False,
                "replacements": 0,
            }

        # Read file content
        with open(file_path, encoding=encoding) as f:
            content = f.read()

        # Compile regex pattern
        try:
            regex = re.compile(pattern)
        except re.error as e:
            return {
                "error": f"Invalid regular expression: {e!s}",
                "success": False,
                "replacements": 0,
            }

        # Replace content
        new_content, replacements = regex.subn(replacement, content, count=count)

        if replacements > 0:
            # Write updated content back to file
            with open(file_path, "w", encoding=encoding) as f:
                f.write(new_content)

        return {
            "path": str(file_path),
            "replacements": replacements,
            "success": True,
        }
    except UnicodeDecodeError:
        return {
            "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.",
            "success": False,
            "replacements": 0,
        }
    except Exception as e:
        return {
            "error": f"Failed to replace content: {e!s}",
            "success": False,
            "replacements": 0,
        }


def _format_mode(mode: int) -> str:
    """Format file mode into a string representation.

    Args:
        mode: File mode as an integer

    Returns:
        String representation of file permissions
    """
    result = ""

    # File type
    if stat.S_ISDIR(mode):
        result += "d"
    elif stat.S_ISLNK(mode):
        result += "l"
    else:
        result += "-"

    # User permissions
    result += "r" if mode & stat.S_IRUSR else "-"
    result += "w" if mode & stat.S_IWUSR else "-"
    result += "x" if mode & stat.S_IXUSR else "-"

    # Group permissions
    result += "r" if mode & stat.S_IRGRP else "-"
    result += "w" if mode & stat.S_IWGRP else "-"
    result += "x" if mode & stat.S_IXGRP else "-"

    # Other permissions
    result += "r" if mode & stat.S_IROTH else "-"
    result += "w" if mode & stat.S_IWOTH else "-"
    result += "x" if mode & stat.S_IXOTH else "-"

    return result


def _get_file_info(path: Path) -> dict[str, Any]:
    """Get detailed information about a file or directory.

    Args:
        path: Path to the file or directory

    Returns:
        Dictionary containing file information
    """
    stats = path.stat()

    # Format timestamps
    mtime = datetime.fromtimestamp(stats.st_mtime).isoformat()
    ctime = datetime.fromtimestamp(stats.st_ctime).isoformat()
    atime = datetime.fromtimestamp(stats.st_atime).isoformat()

    # Get file type
    if path.is_dir():
        file_type = "directory"
    elif path.is_symlink():
        file_type = "symlink"
    else:
        file_type = "file"

    # Format size
    size = stats.st_size
    size_str = f"{size} bytes"
    if size >= 1024:
        size_str = f"{size / 1024:.2f} KB"
    if size >= 1024 * 1024:
        size_str = f"{size / (1024 * 1024):.2f} MB"
    if size >= 1024 * 1024 * 1024:
        size_str = f"{size / (1024 * 1024 * 1024):.2f} GB"

    return {
        "name": path.name,
        "path": str(path),
        "type": file_type,
        "size": size,
        "size_formatted": size_str,
        "permissions": _format_mode(stats.st_mode),
        "mode": stats.st_mode,
        "owner": stats.st_uid,
        "group": stats.st_gid,
        "created": ctime,
        "modified": mtime,
        "accessed": atime,
    }


@mcp.tool(description="List directory contents with detailed information.")
async def list_directory(  # noqa: C901
    path: Annotated[str, Field(description="Directory path")],
    recursive: Annotated[bool, Field(default=False, description="Whether to list recursively")] = False,
    max_depth: Annotated[int, Field(default=-1, description="Maximum recursion depth")] = -1,
    include_hidden: Annotated[bool, Field(default=False, description="Whether to include hidden files")] = False,
    ignore_patterns: Annotated[
        list[str] | None,
        Field(
            default=[
                "node_modules",
                "dist",
                "build",
                "public",
                "static",
                ".next",
                ".git",
                ".vscode",
                ".idea",
                ".DS_Store",
                ".env",
                ".venv",
            ],
            description="Glob patterns to ignore (e.g. ['node_modules', '*.tmp'])",
        ),
    ] = None,
) -> dict[str, Any]:
    """List directory contents with detailed information.

    Args:
        path: Directory path
        recursive: Optional. Whether to list recursively (default: False)
        max_depth: Optional. Maximum recursion depth (default: -1, which means no limit)
        include_hidden: Optional. Whether to include hidden files (default: False)
        ignore_patterns: Optional. Glob patterns to ignore (default: ['node_modules', 'dist', 'build', 'public', 'static', '.next', '.git', '.vscode', '.idea', '.DS_Store', '.env', '.venv'])

    Returns:
        Dictionary containing directory contents and metadata
    """
    ignore_patterns = (
        ignore_patterns
        if ignore_patterns is not None
        else [
            "node_modules",
            "dist",
            "build",
            "public",
            "static",
            ".next",
            ".git",
            ".vscode",
            ".idea",
            ".DS_Store",
            ".env",
            ".venv",
        ]
    )
    try:
        dir_path = Path(path).expanduser()

        if not dir_path.exists():
            return {
                "error": f"Directory not found: {path}",
                "entries": [],
                "success": False,
            }

        if not dir_path.is_dir():
            return {
                "error": f"Path is not a directory: {path}",
                "entries": [],
                "success": False,
            }

        entries = []

        # Import fnmatch for pattern matching
        import fnmatch

        def should_ignore(path: Path) -> bool:
            """Check if a path should be ignored based on ignore patterns.

            Args:
                path: Path to check

            Returns:
                True if the path should be ignored, False otherwise
            """
            if not ignore_patterns:
                return False

            return any(fnmatch.fnmatch(path.name, pattern) for pattern in ignore_patterns)

        def process_directory(current_path: Path, current_depth: int = 0) -> None:
            """Process a directory and its contents recursively.

            Args:
                current_path: Path to the current directory
                current_depth: Current recursion depth
            """
            nonlocal entries

            # Check if we've reached the maximum depth
            if max_depth >= 0 and current_depth > max_depth:
                return

            try:
                # List directory contents
                for item in current_path.iterdir():
                    # Skip hidden files if not included
                    if not include_hidden and item.name.startswith("."):
                        continue

                    # Skip ignored patterns
                    if should_ignore(item):
                        continue

                    # Get file information
                    file_info = _get_file_info(item)
                    file_info["depth"] = current_depth
                    entries.append(file_info)

                    # Recursively process subdirectories
                    if recursive and item.is_dir():
                        process_directory(item, current_depth + 1)
            except PermissionError:
                # Add an entry indicating permission denied
                entries.append({
                    "name": current_path.name,
                    "path": str(current_path),
                    "type": "directory",
                    "error": "Permission denied",
                    "depth": current_depth,
                })

        # Start processing from the root directory
        process_directory(dir_path)

        return {
            "path": str(dir_path),
            "entries": entries,
            "count": len(entries),
            "success": True,
        }
    except Exception as e:
        return {
            "error": f"Failed to list directory: {e!s}",
            "entries": [],
            "success": False,
        }

```
Page 1/2FirstPrevNextLast