#
tokens: 15234/50000 12/12 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE
├── MANIFEST.in
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│   └── video_edit_mcp
│       ├── __init__.py
│       ├── audio_operations.py
│       ├── download_utils.py
│       ├── main.py
│       ├── util_tools.py
│       ├── utils.py
│       └── video_operations.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so

# Distribution / packaging
build/
dist/
*.egg-info/
*.egg

# Virtual environments
.env
.venv
env/
venv/
ENV/

# Testing
.pytest_cache/
.coverage
htmlcov/

# Media files (project-specific)
*.mp4
*.avi
*.mov
*.mkv
*.flv
*.wmv
*.webm
*.mp3
*.wav
*.aac
*.ogg
*.flac
*.m4a
*.jpg
*.jpeg
*.png
*.gif
*.bmp
*.tiff
*.svg

# Video processing outputs
video_store/
output/
temp/
downloads/

# IDE
.vscode/
.idea/
*.swp
*.swo

# OS
.DS_Store
Thumbs.db
Desktop.ini

# Temporary files
*.tmp
*.temp

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Video Edit MCP Server 🎬

A powerful **Model Context Protocol (MCP)** server designed for advanced video and audio editing operations. This server enables MCP clients—such as Claude Desktop, Cursor, and others—to perform comprehensive multimedia editing tasks through a standardized and unified interface.

![Python](https://img.shields.io/badge/python-3.10+-blue.svg)
![MCP](https://img.shields.io/badge/MCP-Compatible-purple.svg)
![License](https://img.shields.io/badge/license-MIT-blue.svg)


https://github.com/user-attachments/assets/134b8b82-80b1-4678-8930-ab53121b121f





## ✨ Key Features

### 🎥 Video Operations
- **Basic Editing**: Trim, merge, resize, crop, rotate videos
- **Effects**: Speed control, fade in/out, grayscale, mirror
- **Overlays**: Add text, images, or video overlays with transparency
- **Format Conversion**: Convert between formats with codec control
- **Frame Operations**: Extract frames, create videos from images

### 🎵 Audio Operations  
- **Audio Processing**: Extract, trim, loop, concatenate audio
- **Volume Control**: Adjust levels, fade in/out effects
- **Audio Mixing**: Mix multiple tracks together
- **Integration**: Add audio to videos, replace soundtracks

### 📥 Download & Utilities
- **Video Download**: Download from YouTube and other platforms
- **File Management**: Directory operations, file listing
- **Path Suggestions**: Get recommended download locations

### 🧹 Memory & Cleanup
- **Smart Memory**: Chain operations without saving intermediate files
- **Resource Management**: Clear memory, check stored objects
- **Efficient Processing**: Keep objects in memory for complex workflows

### 🔗 Operation Chaining
Seamlessly chain multiple operations together without creating intermediate files. Process your video through multiple steps (trim → add audio → apply effects → add text) while keeping everything in memory for optimal performance.

## 📋 Requirements

- **Python 3.10 or higher**
- **moviepy==1.0.3**
- **yt-dlp>=2023.1.6**
- **mcp>=1.12.2**
- **typing-extensions>=4.0.0**

## ⚙️ Installation & Setup



### For Claude Desktop / Cursor MCP Integration

**Ensure that `uv` is installed.**  
If not, install it using the following PowerShell command:

```powershell
powershell -ExecutionPolicy Bypass -Command "irm https://astral.sh/uv/install.ps1 | iex"
```

Add this configuration to your MCP configuration file:

```json
{
  "mcpServers": {
    "video_editing": {
      "command": "uvx",
      "args": [
        "--python",
        "3.11",
        "video-edit-mcp"
      ]
    }
  }
}
```

**Configuration file locations:**
- **Claude Desktop (Windows)**: `%APPDATA%/Claude/claude_desktop_config.json`
- **Claude Desktop (macOS)**: `~/Library/Application Support/Claude/claude_desktop_config.json`
- **Cursor**: `.cursor/mcp.json` in your project root

### Manual Installation

```bash
git clone https://github.com/Aditya2755/video-edit-mcp.git
cd video-edit-mcp
pip install -r requirements.txt
pip install -e .
```

## 🏗️ Project Structure

```
video_edit_mcp/
├── src/
│   └── video_edit_mcp/
│       ├── __init__.py
│       ├── main.py                 # MCP server implementation  
│       ├── video_operations.py     # Video editing tools
│       ├── audio_operations.py     # Audio processing tools
│       ├── download_utils.py       # Download functionality
│       ├── util_tools.py          # Memory & utility tools
│       ├── utils.py               # Utility functions
│     
├── pyproject.toml                 # Project configuration
├── requirements.txt               # Dependencies
├── uv.lock                        # Lock file
├── LICENSE                        # MIT License
├── MANIFEST.in                    # Manifest file
└── README.md
```

## 🎯 Example Usage

```python
# Chain operations without intermediate files
video_info = get_video_info("input.mp4")
trimmed = trim_video("input.mp4", 10, 60, return_path=False)  # Keep in memory
with_audio = add_audio(trimmed, "background.mp3", return_path=False)  
final = add_text_overlay(with_audio, "Hello World", x=100, y=50, return_path=True)
```

## 🚀 Future Enhancements & Contributions

We welcome contributions in these exciting areas:

### 🤖 AI-Powered Features
- **Speech-to-Text (STT)**: Automatic subtitle generation and transcription
- **Text-to-Speech (TTS)**: AI voice synthesis for narration
- **Audio Enhancement**: AI-based noise reduction and audio quality improvement
- **Smart Timestamps**: Automatic scene detection and chapter generation
- **Face Tracking**: Advanced face detection and tracking for automatic editing
- **Object Recognition**: Track and edit based on detected objects
- **Content Analysis**: AI-powered content categorization and tagging

## 🤝 Contributing

1. Fork the repository
2. Create a feature branch: `git checkout -b feature/amazing-feature`
3. Commit your changes: `git commit -m 'Add amazing feature'`
4. Push to the branch: `git push origin feature/amazing-feature`
5. Open a Pull Request

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

---

<div align="center">

**Made with ❤️ for the AI and multimedia editing community**

[⭐ Star this project](https://github.com/Aditya2755/video-edit-mcp) | [🤝 Contribute](https://github.com/Aditya2755/video-edit-mcp/contribute) | [📖 Documentation](https://github.com/Aditya2755/video-edit-mcp#readme)

</div>
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
moviepy==1.0.3
yt-dlp>=2023.1.6
mcp>=1.12.2
Pillow==9.5.0

```

--------------------------------------------------------------------------------
/src/video_edit_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
"""Video Edit MCP Server - A powerful MCP server for video editing operations using MoviePy."""

__version__ = "0.1.1"


from .main import mcp

__all__ = ["mcp"] 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/main.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
import logging
from .video_operations import register_video_tools
from .audio_operations import register_audio_tools
from .download_utils import register_download_and_utility_tools
from .util_tools import register_util_tools
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

mcp = FastMCP()

# Register all tools from different modules
register_video_tools(mcp)
register_audio_tools(mcp)
register_download_and_utility_tools(mcp)
register_util_tools(mcp)

def main():
    """Entry point for the MCP server"""
    mcp.run(transport="streamable-http")

if __name__ == "__main__":
    main()




```

--------------------------------------------------------------------------------
/src/video_edit_mcp/utils.py:
--------------------------------------------------------------------------------

```python
# Simple cross-platform output directory helper
import os
from pathlib import Path
import uuid
from moviepy.editor import VideoFileClip, AudioFileClip
from PIL import Image, ImageDraw, ImageFont
import tempfile
import logging

logger = logging.getLogger(__name__)

def get_output_path(filename: str) -> str:
    """Get cross-platform output path for files"""
    # Use environment variable if set, otherwise default to Downloads
    output_dir = os.environ.get("VIDEO_MCP_OUTPUT_DIR", str(Path.home() / "Downloads" / "video_mcp_output"))
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    return os.path.join(output_dir, filename)

class VideoStore:
    _store = {}

    @classmethod
    def store(cls, video_clip) -> str:
        ref = str(uuid.uuid4())
        cls._store[ref] = video_clip
        return ref

    @classmethod
    def load(cls, video_ref: str):
        if video_ref in cls._store:
            return cls._store[video_ref]
        return VideoFileClip(video_ref)
    
    @classmethod
    def clear(cls):
        cls._store.clear()
    
class AudioStore:
    _store = {}

    @classmethod
    def store(cls, audio_clip) -> str:
        ref = str(uuid.uuid4())
        cls._store[ref] = audio_clip
        return ref
    
    @classmethod
    def load(cls, audio_ref: str):
        if audio_ref in cls._store:
            return cls._store[audio_ref]
        return AudioFileClip(audio_ref)
    
    @classmethod
    def clear(cls):
        cls._store.clear()



```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "video-edit-mcp"
version = "0.1.1"
description = "A powerful Model Context Protocol server for video editing operations using MoviePy"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
authors = [
    {name = "Aditya_Barasiya", email = "[email protected]"},
]
keywords = ["mcp", "video", "editing", "moviepy", "ai", "server", "claude", "video-processing"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Topic :: Multimedia :: Video",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]

dependencies = [
    "mcp>=1.12.2",
    "moviepy==1.0.3",
    "yt-dlp>=2023.1.6",
    "typing-extensions>=4.0.0",
    "Pillow==9.5.0"
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0",
    "black>=22.0",
    "flake8>=4.0",
    "mypy>=1.0",
]

[project.urls]
Homepage = "https://github.com/Aditya2755/video-edit-mcp"
Documentation = "https://github.com/Aditya2755/video-edit-mcp#readme"
Repository = "https://github.com/Aditya2755/video-edit-mcp.git"
"Bug Tracker" = "https://github.com/Aditya2755/video-edit-mcp/issues"

[project.scripts]
video-edit-mcp = "video_edit_mcp.main:main"

[tool.hatch.build.targets.wheel]
packages = ["src/video_edit_mcp"]

[tool.hatch.build.targets.sdist]
include = [
    "/src",
    "/README.md",
    "/LICENSE",
    "/requirements.txt",
]

[tool.black]
line-length = 88
target-version = ['py310'] # Corrected: Changed from 'py38' to 'py310'

[tool.mypy]
python_version = "3.10" # Corrected: Changed from "3.8" to "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false

```

--------------------------------------------------------------------------------
/src/video_edit_mcp/util_tools.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any
from moviepy.editor import VideoFileClip, AudioFileClip
import os
import logging
from .utils import VideoStore, AudioStore

logger = logging.getLogger(__name__)


def register_util_tools(mcp):
    @mcp.tool(description="Use this tool to check what is stored in memory, like objects and etc.")
    def check_memory(store_type: str = "both") -> Dict[str, Any]:
        """
        Check what objects are stored in memory.
        
        Args:
            store_type: Type of store to check ("video", "audio", or "both")
        """
        try:
            if store_type.lower() == "video":
                return {
                    "success": True,
                    "video_memory": VideoStore._store,
                    "video_count": len(VideoStore._store)
                }
            elif store_type.lower() == "audio":
                return {
                    "success": True,
                    "audio_memory": AudioStore._store,
                    "audio_count": len(AudioStore._store)
                }
            else:  # both or any other value
                return {
                    "success": True,
                    "video_memory": VideoStore._store,
                    "audio_memory": AudioStore._store,
                    "video_count": len(VideoStore._store),
                    "audio_count": len(AudioStore._store),
                    "total_objects": len(VideoStore._store) + len(AudioStore._store)
                }
        except Exception as e:
            logger.error(f"Error checking memory: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error checking memory"
            }

    @mcp.tool(description="Use this tool for clearing all stored video and audio objects from memory to free up space")
    def clear_memory(clear_videos:bool, clear_audios:bool) -> Dict[str,Any]:
        try:
            if clear_videos:
                VideoStore.clear()
            if clear_audios:
                AudioStore.clear()
            return {
                "success": True,
                "message": f"Memory cleared - Videos: {clear_videos}, Audios: {clear_audios}"
            }
        except Exception as e:
            logger.error(f"Error clearing memory: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error clearing memory"
            }

    @mcp.tool(description="Use this tool for listing files in a directory, provide directory path")
    def list_files(directory_path: str) -> Dict[str, Any]:
        try:
            if not os.path.exists(directory_path):
                return {
                    "success": False,
                    "error": "Directory does not exist",
                    "message": "Provide valid directory path"
                }
            files = os.listdir(directory_path)
            return {
                "success": True,
                "files": files
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "message": "Error listing files"
            }
        
    @mcp.tool(description="Use this tool to create a directory to store output files, make sure to provide accurate path")
    def make_directory(directory_path: str) -> Dict[str, Any]:
        try:
            os.makedirs(directory_path, exist_ok=True)
            return {
                "success": True,
                "message": "Directory created successfully"
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "message": "Error creating directory"
            } 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/download_utils.py:
--------------------------------------------------------------------------------

```python
import yt_dlp
import os
import logging
from typing import Dict, Any, Optional, List
from .utils import VideoStore, AudioStore

logger = logging.getLogger(__name__)

def get_default_download_paths():
    """Get common download directory paths"""
    user_home = os.path.expanduser("~")
    return {
        "user_downloads": os.path.join(user_home, "Downloads"),
        "desktop": os.path.join(user_home, "Desktop"), 
        "documents": os.path.join(user_home, "Documents"),
        "project_root": os.path.abspath(".") if "video_edit_mcp" in os.path.abspath(".") else None
    }

def register_download_and_utility_tools(mcp):
    """Register all download and utility tools with the MCP server"""
    @mcp.tool(description= "use this tool to download videos make sure to give proper path for saving video not just name, if there are multiple steps to be done after downloading then make sure to return object and return path should be false else return path should be true")
    def download_video(
        url: str,
        save_path: Optional[str] = None,
        audio_only: bool = False,
        **yt_dlp_options: Any
    ) -> Dict[str, Any]:
        """
        Download video or audio from URL using yt-dlp
        
        Args:
            url: URL to download from
            save_path: Directory path or full file path template where to save
            audio_only: If True, download and extract audio only
            **yt_dlp_options: Additional yt-dlp options
            
        Returns:
            Dict with success status, file path, and other info
        """
        try:
            # Start with user-provided yt-dlp options
            ydl_opts = yt_dlp_options.copy()
            
            # --- Handle Core Parameters ---
            
            # 1. Set output path template if not already specified by user
            if 'outtmpl' not in ydl_opts:
                if save_path:
                    # Check if user provided a relative path that might cause confusion
                    if not os.path.isabs(save_path) and not save_path.startswith('.'):
                        logger.warning(f"Relative path detected: '{save_path}'. This will save relative to current directory: {os.getcwd()}")
                        
                    # Convert save_path to absolute path first
                    save_path = os.path.abspath(save_path)
                    
                    if os.path.isdir(save_path):
                        # save_path is a directory
                        ydl_opts['outtmpl'] = os.path.join(save_path, '%(title)s [%(id)s].%(ext)s')
                    else:
                        # save_path could be a full path or template
                        # Check if it contains yt-dlp template variables
                        if '%(' in save_path:
                            ydl_opts['outtmpl'] = save_path
                        else:
                            # Treat as full file path, ensure directory exists
                            directory = os.path.dirname(save_path)
                            if directory and not os.path.exists(directory):
                                os.makedirs(directory, exist_ok=True)
                            ydl_opts['outtmpl'] = save_path
                else:
                    # Default to user's Downloads folder instead of current directory
                    default_paths = get_default_download_paths()
                    downloads_dir = default_paths["user_downloads"]
                    
                    # Create Downloads directory if it doesn't exist
                    os.makedirs(downloads_dir, exist_ok=True)
                    ydl_opts['outtmpl'] = os.path.join(downloads_dir, '%(title)s [%(id)s].%(ext)s')

            # 2. Configure for audio-only downloads
            if audio_only:
                # Set format to best audio
                ydl_opts['format'] = 'bestaudio/best'
                # Ensure post-processor is set to extract audio (defaults to mp3)
                if 'postprocessors' not in ydl_opts:
                    ydl_opts['postprocessors'] = []
                
                # Add audio extraction post-processor
                audio_postprocessor = {
                    'key': 'FFmpegExtractAudio',
                    'preferredcodec': ydl_opts.get('audio_format', 'mp3'),
                    'preferredquality': ydl_opts.get('audio_quality', '192'),
                }
                ydl_opts['postprocessors'].append(audio_postprocessor)

            # Add logger
            ydl_opts['logger'] = logger

            # --- Execute Download ---
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                # Extract info and download
                info = ydl.extract_info(url, download=not ydl_opts.get('simulate', False))
                
                # Prepare response
                response = {
                    "success": True,
                    "message": "Operation successful!",
                    "title": info.get('title', 'Unknown'),
                    "id": info.get('id', 'Unknown'),
                    "duration": info.get('duration'),
                    "uploader": info.get('uploader')
                }
                
                # If not simulating, get the actual file path
                if not ydl_opts.get('simulate', False):
                    # Get the base filename that yt-dlp would use
                    base_filepath = ydl.prepare_filename(info)
                    
                    # Convert to absolute path
                    base_filepath = os.path.abspath(base_filepath)
                    
                    # For audio-only downloads, the file extension will change
                    if audio_only:
                        # Find the audio codec used
                        audio_codec = ydl_opts.get('audio_format', 'mp3')
                        # Change extension to match the extracted audio format
                        base_name = os.path.splitext(base_filepath)[0]
                        actual_filepath = f"{base_name}.{audio_codec}"
                    else:
                        actual_filepath = base_filepath
                    
                    # Ensure we have absolute path
                    actual_filepath = os.path.abspath(actual_filepath)
                    
                    # Verify the file exists
                    if os.path.exists(actual_filepath):
                        response.update({
                            "filepath": actual_filepath,
                            "filename": os.path.basename(actual_filepath),
                            "directory": os.path.dirname(actual_filepath),
                            "file_size": os.path.getsize(actual_filepath)
                        })
                    else:
                        # If exact path doesn't exist, try to find the actual file
                        # This handles cases where yt-dlp might have modified the filename
                        directory = os.path.dirname(actual_filepath)
                        base_name = os.path.splitext(os.path.basename(actual_filepath))[0]
                        
                        # Look for files with similar names in the directory
                        if os.path.exists(directory):
                            found_file = None
                            for file in os.listdir(directory):
                                if base_name in file or info.get('id', '') in file:
                                    found_file = file
                                    break
                            
                            if found_file:
                                found_filepath = os.path.abspath(os.path.join(directory, found_file))
                                response.update({
                                    "filepath": found_filepath,
                                    "filename": found_file,
                                    "directory": os.path.dirname(found_filepath),
                                    "file_size": os.path.getsize(found_filepath),
                                    "note": "Filename was modified during download"
                                })
                            else:
                                response.update({
                                    "filepath": actual_filepath,
                                    "filename": os.path.basename(actual_filepath),
                                    "directory": os.path.dirname(actual_filepath),
                                    "warning": "File path expected but not found at exact location",
                                    "expected_path": actual_filepath
                                })
                        else:
                            response.update({
                                "filepath": actual_filepath,
                                "filename": os.path.basename(actual_filepath),
                                "directory": os.path.dirname(actual_filepath),
                                "warning": "Directory not found",
                                "expected_path": actual_filepath
                            })
                else:
                    # Simulation mode
                    simulated_path = ydl.prepare_filename(info)
                    response["simulated_filepath"] = os.path.abspath(simulated_path)
                    response["message"] = "Simulation successful - no file downloaded"

                return response
                
        except Exception as e:
            logger.error(f"Error during video download from {url}: {e}", exc_info=True)
            return {
                "success": False, 
                "error": str(e), 
                "error_type": type(e).__name__,
                "url": url,
                "message": "Download failed"
            }

    @mcp.tool(description="Get suggested download directory paths for saving videos/audio files")
    def get_download_paths() -> Dict[str, Any]:
        """
        Get suggested common download directory paths
        
        Returns:
            Dict with suggested download paths
        """
        try:
            paths = get_default_download_paths()
            current_dir = os.getcwd()
            
            return {
                "success": True,
                "current_working_directory": current_dir,
                "suggested_paths": {
                    "user_downloads": paths["user_downloads"],
                    "desktop": paths["desktop"],
                    "documents": paths["documents"],
                    "project_root": paths["project_root"],
                    "current_directory": current_dir
                },
                "usage_examples": {
                    "save_to_downloads": paths["user_downloads"],
                    "save_to_custom_folder": os.path.join(paths["user_downloads"], "YouTube_Videos"),
                    "save_to_desktop": paths["desktop"]
                },
                "note": "Use full absolute paths to avoid saving in unexpected locations"
            }
        except Exception as e:
            logger.error(f"Error getting download paths: {e}")
            return {
                "success": False,
                "error": str(e),
                "current_directory": os.getcwd(),
                "message": "Error getting download paths"
            }

    
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/audio_operations.py:
--------------------------------------------------------------------------------

```python
from moviepy.editor import *
from moviepy.audio.fx.all import audio_loop
from moviepy.editor import CompositeAudioClip
from typing import Dict, Any, List
import os
import logging
from .utils import get_output_path, AudioStore

logger = logging.getLogger(__name__)

def register_audio_tools(mcp):
    """Register all audio processing tools with the MCP server"""
    
    @mcp.tool(description="get audio info")
    def audio_info(audio_path:str) -> Dict[str,Any]:
        try:
            
            audio = AudioFileClip(audio_path)
            return{
                "success": True,
                "audio_info": {
                    "duration": audio.duration,
                    "fps": audio.fps,
                    "channels": audio.nchannels
                }
            }
        except Exception as e:
            logger.error(f"Error getting audio info for {audio_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__
            }

    @mcp.tool(description="Use this tool for extracting audio from the video , and make sure only give output name like extracted_audio.mp3 , some_hello.mp3 etc.. don't pass path just give meaningful names based on audio info, if there are multiple steps to be done after extracting audio then make sure to return object and return path should be false else return path should be true")
    def extract_audio(video_path: str, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            output_path = get_output_path(output_name)
            from .utils import VideoStore
            video = VideoStore.load(video_path)
            
            if video.audio is None:
                return {
                    "success": False,
                    "error": "Video has no audio track",
                    "message": "No audio to extract"
                }
            
            audio = video.audio
            if return_path:
                audio.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio extracted successfully"
                }
            else:
                ref = AudioStore.store(audio)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error extracting audio from {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error extracting audio"
            }

    @mcp.tool(description="Trim audio file, make sure to provide proper start and end time, and make sure to provide output name like trimmed_audio.mp3, some_hello.mp3 etc. don't pass path just give meaningful names based on audio info, if there are multiple steps to be done after trimming then make sure to return object and return path should be false else return path should be true")
    def trim_audio(audio_path: str, start_time: float, end_time: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if start_time < 0 or end_time < 0:
                return {
                    "success": False,
                    "error": "Start and end times must be positive",
                    "message": "Invalid time parameters"
                }
            if start_time >= end_time:
                return {
                    "success": False,
                    "error": "Start time must be less than end time",
                    "message": "Invalid time range"
                }
            
            output_path = get_output_path(output_name)
            audio = AudioStore.load(audio_path)
            trimmed_audio = audio.subclip(start_time, end_time)
            
            if return_path:
                trimmed_audio.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio trimmed successfully"
                }
            else:
                ref = AudioStore.store(trimmed_audio)
                return {
                    "success": True,
                    "output_object": ref,
                    "message": "Audio trimmed successfully"
                }
        except Exception as e:
            logger.error(f"Error trimming audio {audio_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error trimming audio"
            }

    @mcp.tool(description="Use this tool to concatenate two audios, if there are multiple steps to be done after concatenating then make sure to return object and return path should be false else return path should be true")
    def concatenate_audio(audio_path_1: str, audio_path_2: str, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            output_path = get_output_path(output_name)
            audio_1 = AudioStore.load(audio_path_1)
            audio_2 = AudioStore.load(audio_path_2)
            concatenated_audio = concatenate_audioclips([audio_1, audio_2])
            
            if return_path:
                concatenated_audio.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio concatenated successfully"
                }
            else:
                ref = AudioStore.store(concatenated_audio)
                return {
                    "success": True,
                    "output_object": ref,
                    "message": "Audio concatenated successfully"
                }
        except Exception as e:
            logger.error(f"Error concatenating audio files {audio_path_1} and {audio_path_2}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error concatenating audio files"
            }

    @mcp.tool(description="Use this tool to loop the audio, after looping make sure to provide output name like looped_audio.mp3, some_hello.mp3 etc. don't pass path just give meaningful names based on audio info and also make sure to provide duration in seconds, if there are multiple steps to be done after looping then make sure to return object and return path should be false else return path should be true")
    def loop_audio(audio_path: str, duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if duration <= 0:
                return {
                    "success": False,
                    "error": "Duration must be positive",
                    "message": "Invalid duration parameter"
                }
            
            output_path = get_output_path(output_name)
            audio = AudioStore.load(audio_path)
            looped_audio = audio_loop(audio, duration=duration)
            
            if return_path:
                looped_audio.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio looped successfully"
                }
            else:
                ref = AudioStore.store(looped_audio)
                return {
                    "success": True,
                    "output_object": ref,
                    "message": "Audio looped successfully"
                }
        except Exception as e:
            logger.error(f"Error looping audio {audio_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error looping audio"
            }

    @mcp.tool(description="Adjust volume of an audio, if there are multiple steps to be done after adjusting volume then make sure to return object and return path should be false else return path should be true")
    def adjust_vol(audio_path: str, volume_level: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if volume_level <= 0:
                return {
                    "success": False,
                    "error": "Volume level must be positive (e.g., 1.0 for normal, 2.0 for double)",
                    "message": "Invalid volume level parameter"
                }
            
            output_path = get_output_path(output_name)
            audio = AudioStore.load(audio_path)
            audio_adjusted = audio.volumex(volume_level)
            
            if return_path:
                audio_adjusted.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio volume adjusted successfully"
                }
            else:
                ref = AudioStore.store(audio_adjusted)
                return {
                    "success": True,
                    "output_object": ref,
                    "message": "Audio volume adjusted successfully"
                }
        except Exception as e:
            logger.error(f"Error adjusting audio volume {audio_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adjusting audio volume"
            }

    @mcp.tool(description="Use this tool for audio fade in effect, provide fade_duration in seconds, and output name like fadein_audio.mp3, if there are multiple steps to be done after adding fade in then make sure to return object and return path should be false else return path should be true")
    def fadein_audio(audio_path:str, fade_duration:float, output_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            audio = AudioStore.load(audio_path)
            fadein_audio = audio.audio_fadein(fade_duration)
            if return_path:
                fadein_audio.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio fade in effect added successfully"
                }
            else:
                ref = AudioStore.store(fadein_audio)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding audio fade in effect {audio_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding audio fade in effect"
            }

    @mcp.tool(description="Use this tool for audio fade out effect, provide fade_duration in seconds, and output name like fadeout_audio.mp3, if there are multiple steps to be done after adding fade out then make sure to return object and return path should be false else return path should be true")
    def fadeout_audio(audio_path:str, fade_duration:float, output_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            audio = AudioStore.load(audio_path)
            fadeout_audio = audio.audio_fadeout(fade_duration)
            if return_path:
                fadeout_audio.write_audiofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio fade out effect added successfully"
                }
            else:
                ref = AudioStore.store(fadeout_audio)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding audio fade out effect {audio_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding audio fade out effect"
            }

    @mcp.tool(description="Use this tool for mixing multiple audio tracks together, provide list of audio paths and output name, if there are multiple steps to be done after mixing audio tracks then make sure to return object and return path should be false else return path should be true")
    def mix_audio_tracks(audio_paths:List[str], output_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            audio_clips = [AudioStore.load(clips) for clips in audio_paths]
            mixed_audio = CompositeAudioClip(audio_clips)
            mixed_audio.fps = 44100
            if return_path:
                    try:
                        mixed_audio.write_audiofile(output_path)
                        return {
                        "success": True,
                        "output_path": output_path,
                        "message": "Audio tracks mixed successfully"
                    }
                    except Exception as write_error:
                        return {
                        "success": False,
                        "error": f"Failed to write file: {str(write_error)}",
                        "message": "File writing failed"
                        }
            else:
                    return {
                    "success": True,
                    "output_object": mixed_audio
                    }

        except Exception as e:
            return {
                "success": False,
                "error": f"Unexpected error: {str(e)}",
                "error_type": type(e).__name__,
                "message": "Unexpected error occurred"
            } 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/video_operations.py:
--------------------------------------------------------------------------------

```python
from moviepy.editor import *
from moviepy.video.fx import *
from moviepy.video.fx.speedx import speedx
from moviepy.video.fx.rotate import rotate
from moviepy.video.fx.crop import crop
from moviepy.video.fx.resize import resize
from moviepy.video.fx.fadein import fadein
from moviepy.video.fx.fadeout import fadeout
from moviepy.video.fx.blackwhite import blackwhite
from moviepy.video.fx.mirror_x import mirror_x
from moviepy.editor import ImageClip, CompositeVideoClip, ImageSequenceClip, TextClip
from typing import Dict, Any, Optional, List, Tuple
import os
import logging
import imageio
from .utils import get_output_path, VideoStore, AudioStore
import moviepy.config as mpy_conf


logger = logging.getLogger(__name__)

def register_video_tools(mcp):
    """Register all video processing tools with the MCP server"""
    
    @mcp.tool()
    def get_video_info(video_path: str) -> Dict[str, Any]:
        """Get comprehensive information about a video file including duration, fps, resolution, codec details, and audio information."""
        try:
            # Load video file
            video = VideoFileClip(video_path)
            
            # Basic video information
            info = {
                "file_path": video_path,
                "filename": os.path.basename(video_path),
                "duration": video.duration,
                "fps": video.fps,
                "size": video.size,  # (width, height)
                "width": video.w,
                "height": video.h,
                "aspect_ratio": round(video.w / video.h, 2) if video.h > 0 else None,
            }
            
            # Add reader/codec information if available
            if hasattr(video, 'reader') and video.reader:
                reader_info = {
                    "nframes": getattr(video.reader, 'nframes', None),
                    "bitrate": getattr(video.reader, 'bitrate', None),
                    "codec": getattr(video.reader, 'codec', None),
                    "pix_fmt": getattr(video.reader, 'pix_fmt', None),
                }
                # Only add non-None values
                info.update({k: v for k, v in reader_info.items() if v is not None})
            
            # Audio information
            if video.audio is not None:
                audio_info = {
                    "has_audio": True,
                    "audio_duration": video.audio.duration,
                    "audio_fps": video.audio.fps,
                    "audio_channels": getattr(video.audio, 'nchannels', None),
                }
                
                # Add audio reader info if available
                if hasattr(video.audio, 'reader') and video.audio.reader:
                    audio_reader_info = {
                        "audio_bitrate": getattr(video.audio.reader, 'bitrate', None),
                        "audio_codec": getattr(video.audio.reader, 'codec', None),
                        "sample_rate": getattr(video.audio.reader, 'fps', None),
                    }
                    # Only add non-None values
                    audio_info.update({k: v for k, v in audio_reader_info.items() if v is not None})
            else:
                audio_info = {
                    "has_audio": False,
                    "audio_duration": None,
                    "audio_fps": None,
                    "audio_channels": None,
                }
            
            info.update(audio_info)
            
            # File size information
            try:
                file_size = os.path.getsize(video_path)
                info["file_size_bytes"] = file_size
                info["file_size_mb"] = round(file_size / (1024 * 1024), 2)
            except OSError:
                info["file_size_bytes"] = None
                info["file_size_mb"] = None
            
            # Calculate video quality metrics
            if info["duration"] and info["duration"] > 0:
                info["total_frames"] = int(info["fps"] * info["duration"]) if info["fps"] else None
                if info.get("file_size_bytes"):
                    info["average_bitrate_kbps"] = round((info["file_size_bytes"] * 8) / (info["duration"] * 1000), 2)
            
            # Clean up video object to prevent memory leaks
            video.close()
            
            return {
                "success": True,
                "video_info": info
            }
            
        except Exception as e:
            logger.error(f"Error getting video info for {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__
            }
        finally:
            # Ensure video object is cleaned up even if an exception occurs
            try:
                if 'video' in locals():
                    video.close()
            except:
                pass

    @mcp.tool(description="Use this tool for trimming the video, provide start and end time in seconds, and output name like trimmed_video.mp4 , if there are multiple steps to be done after trimming then make sure to return object and return path should be false else return path should be true")
    def trim_video(video_path: str, start_time: float, end_time: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if start_time < 0 or end_time < 0:
                return {
                    "success": False,
                    "error": "Start and end times must be positive",
                    "message": "Invalid time parameters"
                }
            if start_time >= end_time:
                return {
                    "success": False,
                    "error": "Start time must be less than end time",
                    "message": "Invalid time range"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            trimmed_video = video.subclip(start_time, end_time)

            if return_path:
                trimmed_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video trimmed successfully"
                }
            else:
                ref = VideoStore.store(trimmed_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error trimming video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error trimming video"
            }

    @mcp.tool(description="Use this tool for merging two videos, provide two video paths, and output name like merged_video.mp4 , if there are multiple steps to be done after merging then make sure to return object and return path should be false else return path should be true")
    def merge_video(video_path: str, video_path2: str, output_name: str, return_path: bool) -> Dict[str, Any]:
        """Merge two videos into one."""
        try:
            output_path = get_output_path(output_name)
            video1 = VideoStore.load(video_path)
            video2 = VideoStore.load(video_path2)
            merged_video = concatenate_videoclips([video1, video2])
            if return_path:
                merged_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Videos merged successfully"
                }
            else:
                ref = VideoStore.store(merged_video)
                return {
                    "success": True,
                    "output_object": ref,
                    "message": "Videos merged successfully"
                }
        except Exception as e:
            logger.error(f"Error merging videos {video_path} and {video_path2}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error merging videos"
            }

 

    @mcp.tool(description="Use this tool for resizing the video make sure first whether video needs to be saved directly or just object has to be returned for further processing, if there are multiple steps to be done after resizing then make sure to return object and return path should be false else return path should be true")
    def resize_video(video_path: str, size: Tuple[int, int], output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if not size or len(size) != 2 or size[0] <= 0 or size[1] <= 0:
                return {
                    "success": False,
                    "error": "Size must be a tuple of two positive integers (width, height)",
                    "message": "Invalid size parameters"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            resized_video = video.resize(newsize=size)
            
            if return_path:
                resized_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video resized successfully"
                }
            else:
                ref = VideoStore.store(resized_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error resizing video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error resizing video"
            }

    @mcp.tool(description="Use this tool for cropping the video, provide x1, y1, x2, y2 coordinates, and output name like cropped_video.mp4 , if there are multiple steps to be done after cropping then make sure to return object and return path should be false else return path should be true")
    def crop_video(video_path: str, x1: int, y1: int, x2: int, y2: int, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if x1 < 0 or y1 < 0 or x2 <= x1 or y2 <= y1:
                return {
                    "success": False,
                    "error": "Invalid crop coordinates. x2 > x1 and y2 > y1, all values must be non-negative",
                    "message": "Invalid crop parameters"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            cropped_video = crop(video, x1, y1, x2, y2)
            
            if return_path:
                cropped_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video cropped successfully"
                }
            else:
                ref = VideoStore.store(cropped_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error cropping video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error cropping video"
            }

    @mcp.tool(description="Use this tool for rotating the video, and make sure to provide output name like rotated_video.mp4 , some_hello.mp4 etc. don't pass path just give meaningful names based on video info, if there are multiple steps to be done after rotating then make sure to return object and return path should be false else return path should be true")
    def rotate_video(video_path: str, angle: int, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if not isinstance(angle, (int, float)):
                return {
                    "success": False,
                    "error": "Angle must be a number",
                    "message": "Invalid angle parameter"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            rotated_video = rotate(video, angle)
            
            if return_path:
                rotated_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video rotated successfully"
                }
            else:
                ref = VideoStore.store(rotated_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error rotating video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error rotating video"
            }

    @mcp.tool(description="Use this tool for speed up the video, and make sure to provide output name like speed_up_video.mp4 , some_hello.mp4 etc. don't pass path just give meaningful names based on video info, if there are multiple steps to be done after speed up then make sure to return object and return path should be false else return path should be true")
    def speed_up_video(video_path: str, speed: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if speed <= 0:
                return {
                    "success": False,
                    "error": "Speed must be positive (e.g., 2.0 for 2x speed)",
                    "message": "Invalid speed parameter"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            sped_up_video = speedx(video, speed)

            if return_path:
                sped_up_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video speed changed successfully"
                }
            else:
                ref = VideoStore.store(sped_up_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error changing video speed {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error changing video speed"
            }

    @mcp.tool(description="Use this tool for adding audio to the video , and make sure to provide output file name like added_audio.mp4 etc. make sure mp4 extension is provided and name should be meaningful, if there are multiple steps to be done after adding audio then make sure to return object and return path should be false else return path should be true")
    def add_audio(video_path: str, audio_path: str, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            audio = AudioStore.load(audio_path)
            new_video = video.set_audio(audio)
            
            if return_path:
                new_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Audio added successfully"
                }
            else:
                ref = VideoStore.store(new_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding audio to video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding audio to video"
            }


    @mcp.tool(description="Use this tool for adding fade in effect to video, provide fade_duration in seconds, and output name like fadein_video.mp4, if there are multiple steps to be done after adding fade in then make sure to return object and return path should be false else return path should be true")
    def fadein_video(video_path: str, fade_duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if fade_duration <= 0:
                return {
                    "success": False,
                    "error": "Fade duration must be positive",
                    "message": "Invalid fade duration parameter"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            faded_video = fadein(video, fade_duration)
            
            if return_path:
                faded_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Fade in effect added successfully"
                }
            else:
                ref = VideoStore.store(faded_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding fade in effect to video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding fade in effect"
            }

    @mcp.tool(description="Use this tool for adding fade out effect to video, provide fade_duration in seconds, and output name like fadeout_video.mp4, if there are multiple steps to be done after adding fade out then make sure to return object and return path should be false else return path should be true")
    def fadeout_video(video_path: str, fade_duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if fade_duration <= 0:
                return {
                    "success": False,
                    "error": "Fade duration must be positive",
                    "message": "Invalid fade duration parameter"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            faded_video = fadeout(video, fade_duration)
            
            if return_path:
                faded_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Fade out effect added successfully"
                }
            else:
                ref = VideoStore.store(faded_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding fade out effect to video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding fade out effect"
            }

    @mcp.tool(description="Use this tool for adding text overlay to video, provide text, position coordinates (x,y), font_size, color, and output name, if there are multiple steps to be done after adding text overlay then make sure to return object and return path should be false else return path should be true")
    def add_text_overlay(video_path: str, text: str, x: int, y: int, font_size: int, color: str, duration: float, output_name: str, return_path: bool, path_of_imagemagick: str) -> Dict[str, Any]:
        try:
            # Input validation
            if not text or not text.strip():
                return {
                    "success": False,
                    "error": "Text cannot be empty",
                    "message": "Invalid text parameter"
                }
            if font_size <= 0:
                return {
                    "success": False,
                    "error": "Font size must be positive",
                    "message": "Invalid font size parameter"
                }
            if duration <= 0:
                return {
                    "success": False,
                    "error": "Duration must be positive",
                    "message": "Invalid duration parameter"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            
            # Configure ImageMagick
            mpy_conf.change_settings({"IMAGEMAGICK_BINARY": path_of_imagemagick})
            
            # Create a TextClip with specified parameters
            text_clip = TextClip(text, fontsize=font_size, color=color)
            
            # Set position using the provided x, y coordinates and duration
            text_clip = text_clip.set_position((x, y)).set_duration(duration)
            
            # Overlay text on video
            final_video = CompositeVideoClip([video, text_clip])
            
            if return_path:
                final_video.write_videofile(output_path, fps=final_video.fps)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Text overlay added successfully"
                }
            else:
                ref = VideoStore.store(final_video)
                return {
                    "success": True,
                    "output_object": ref,
                    "message": "Text overlay added successfully"
                }

        except Exception as e:
            logger.error(f"Error adding text overlay to video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding text overlay. Make sure ImageMagick is installed and path is correct."
            }

    @mcp.tool(description="Use this tool for adding image watermark/overlay to video, provide image_path, position coordinates (x,y), and output name, if there are multiple steps to be done after adding image overlay then make sure to return object and return path should be false else return path should be true")
    def add_image_overlay(video_path: str, image_path: str, x: int, y: int, duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            # Input validation
            if duration <= 0:
                return {
                    "success": False,
                    "error": "Duration must be positive",
                    "message": "Invalid duration parameter"
                }
            
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            logo = ImageClip(image_path).set_duration(duration).set_position((x, y))
            final_video = CompositeVideoClip([video, logo])
            
            if return_path:
                final_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Image overlay added successfully"
                }
            else:
                ref = VideoStore.store(final_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding image overlay to video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding image overlay"
            }

    @mcp.tool(description="Use this tool for converting video to grayscale/black and white, provide output name like grayscale_video.mp4, if there are multiple steps to be done after converting to grayscale then make sure to return object and return path should be false else return path should be true")
    def grayscale_video(video_path: str, output_name: str, return_path: bool) -> Dict[str, Any]:
        try:
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            gray_video = video.fx(blackwhite)
            
            if return_path:
                gray_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video converted to grayscale successfully"
                }
            else:
                ref = VideoStore.store(gray_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error converting video to grayscale {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error converting video to grayscale"
            }

    @mcp.tool(description="Use this tool for creating video from image sequence, provide folder path with images, fps, and output name, if there are multiple steps to be done after creating video from images then make sure to return object and return path should be false else return path should be true")
    def images_to_video(images_folder_path:str, fps:int, output_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            clip = ImageSequenceClip(images_folder_path, fps=fps)
            if return_path:
                clip.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video created from images successfully"
                }
            else:
                ref = VideoStore.store(clip)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error creating video from images {images_folder_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error creating video from images"
            }

    @mcp.tool(description="Use this tool for extracting frames from video as images, provide start_time, end_time, and fps for extraction, if there are multiple steps to be done after extracting frames then make sure to return object and return path should be false else return path should be true")
    def extract_frames(video_path:str, start_time:float, end_time:float, fps:int, output_folder_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            video = VideoStore.load(video_path)
            subclip = video.subclip(start_time, end_time)
            if return_path:
                os.makedirs(output_folder_name, exist_ok=True)
                for i, frame in enumerate(subclip.iter_frames(fps=fps)):
                    frame_path = os.path.join(output_folder_name, f"frame_{i:04d}.png")
                    imageio.imwrite(frame_path, frame)
                return {
                    "success": True,
                    "output_path": output_folder_name,
                    "message": "Frames extracted successfully"
                }
            else:
                frames = list(subclip.iter_frames(fps=fps))
                return {
                    "success": True,
                    "output_object": frames,
                    "message": "Frames extracted to memory"
                }
        except Exception as e:
            logger.error(f"Error extracting frames from video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error extracting frames from video"
            }

    @mcp.tool(description="Use this tool for mirroring video horizontally, provide output name like mirrored_video.mp4, if there are multiple steps to be done after mirroring then make sure to return object and return path should be false else return path should be true")
    def mirror_video(video_path:str, output_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            mirrored_video = video.fx(mirror_x)
            if return_path:
                mirrored_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video mirrored successfully"
                }
            else:
                ref = VideoStore.store(mirrored_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error mirroring video {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error mirroring video"
            }

    @mcp.tool(description="Use this tool for splitting video into multiple parts at specific timestamps, provide list of split times in seconds, if there are multiple steps to be done after splitting then make sure to return object and return path should be false else return path should be true")
    def split_video_at_times(video_path:str, split_times:List[float], output_name:str, return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            segments = []
            split_times = [0] + split_times + [video.duration]
            
            for i in range(len(split_times) - 1):
                start = split_times[i]
                end = split_times[i + 1]
                segment = video.subclip(start, end)
                segments.append(segment)
                
            if return_path:
                output_paths = []
                for i, segment in enumerate(segments):
                    segment_path = os.path.join(output_path, f"{output_name}_part_{i+1}.mp4")
                    segment.write_videofile(segment_path)
                    output_paths.append(segment_path)
                return {
                    "success": True,
                    "output_paths": output_paths,
                    "message": "Video split successfully"
                }
            else:
                refs = [VideoStore.store(segment) for segment in segments]
                return {
                    "success": True,
                    "output_objects": refs
                }
        except Exception as e:
            logger.error(f"Error splitting video at times {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error splitting video at times"
            }

    @mcp.tool(description="Use this tool for converting video format with codec and quality control, provide codec, fps, bitrate, if there are multiple steps to be done after converting video format then make sure to return object and return path should be false else return path should be true")
    def convert_video_format(video_path:str, output_name:str, codec:str, fps:Optional[int], bitrate:Optional[str], return_path:bool) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            video = VideoStore.load(video_path)
            write_kwargs = {"codec": codec}
            if fps:
                write_kwargs["fps"] = fps
            if bitrate:
                write_kwargs["bitrate"] = bitrate
                
            if return_path:
                video.write_videofile(output_path, **write_kwargs)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video format converted successfully"
                }
            else:
                ref = VideoStore.store(video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error converting video format {video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error converting video format"
            }

    @mcp.tool(description="Use this tool for adding video overlay with transparency, provide overlay video, position, and opacity (0-1), if there are multiple steps to be done after adding video overlay then make sure to return object and return path should be false else return path should be true")
    def add_video_overlay(base_video_path:str, overlay_video_path:str, x:int, y:int, opacity:float, output_name:str, return_path:bool,duration:float) -> Dict[str,Any]:
        try:
            output_path = get_output_path(output_name)
            base_video = VideoStore.load(base_video_path)
            overlay_video = VideoStore.load(overlay_video_path)
            
            overlay_positioned = overlay_video.set_position((x, y)).set_opacity(opacity).set_duration(duration)
            final_video = CompositeVideoClip([base_video, overlay_positioned])
            
            if return_path:
                final_video.write_videofile(output_path)
                return {
                    "success": True,
                    "output_path": output_path,
                    "message": "Video overlay added successfully"
                }
            else:
                ref = VideoStore.store(final_video)
                return {
                    "success": True,
                    "output_object": ref
                }
        except Exception as e:
            logger.error(f"Error adding video overlay {base_video_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "Error adding video overlay"
            } 
        

```