#
tokens: 19597/50000 12/12 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE
├── MANIFEST.in
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│   └── video_edit_mcp
│       ├── __init__.py
│       ├── audio_operations.py
│       ├── download_utils.py
│       ├── main.py
│       ├── util_tools.py
│       ├── utils.py
│       └── video_operations.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python
 2 | __pycache__/
 3 | *.py[cod]
 4 | *$py.class
 5 | *.so
 6 | 
 7 | # Distribution / packaging
 8 | build/
 9 | dist/
10 | *.egg-info/
11 | *.egg
12 | 
13 | # Virtual environments
14 | .env
15 | .venv
16 | env/
17 | venv/
18 | ENV/
19 | 
20 | # Testing
21 | .pytest_cache/
22 | .coverage
23 | htmlcov/
24 | 
25 | # Media files (project-specific)
26 | *.mp4
27 | *.avi
28 | *.mov
29 | *.mkv
30 | *.flv
31 | *.wmv
32 | *.webm
33 | *.mp3
34 | *.wav
35 | *.aac
36 | *.ogg
37 | *.flac
38 | *.m4a
39 | *.jpg
40 | *.jpeg
41 | *.png
42 | *.gif
43 | *.bmp
44 | *.tiff
45 | *.svg
46 | 
47 | # Video processing outputs
48 | video_store/
49 | output/
50 | temp/
51 | downloads/
52 | 
53 | # IDE
54 | .vscode/
55 | .idea/
56 | *.swp
57 | *.swo
58 | 
59 | # OS
60 | .DS_Store
61 | Thumbs.db
62 | Desktop.ini
63 | 
64 | # Temporary files
65 | *.tmp
66 | *.temp
67 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Video Edit MCP Server 🎬
  2 | 
  3 | A powerful **Model Context Protocol (MCP)** server designed for advanced video and audio editing operations. This server enables MCP clients—such as Claude Desktop, Cursor, and others—to perform comprehensive multimedia editing tasks through a standardized and unified interface.
  4 | 
  5 | ![Python](https://img.shields.io/badge/python-3.10+-blue.svg)
  6 | ![MCP](https://img.shields.io/badge/MCP-Compatible-purple.svg)
  7 | ![License](https://img.shields.io/badge/license-MIT-blue.svg)
  8 | 
  9 | 
 10 | https://github.com/user-attachments/assets/134b8b82-80b1-4678-8930-ab53121b121f
 11 | 
 12 | 
 13 | 
 14 | 
 15 | 
 16 | ## ✨ Key Features
 17 | 
 18 | ### 🎥 Video Operations
 19 | - **Basic Editing**: Trim, merge, resize, crop, rotate videos
 20 | - **Effects**: Speed control, fade in/out, grayscale, mirror
 21 | - **Overlays**: Add text, images, or video overlays with transparency
 22 | - **Format Conversion**: Convert between formats with codec control
 23 | - **Frame Operations**: Extract frames, create videos from images
 24 | 
 25 | ### 🎵 Audio Operations  
 26 | - **Audio Processing**: Extract, trim, loop, concatenate audio
 27 | - **Volume Control**: Adjust levels, fade in/out effects
 28 | - **Audio Mixing**: Mix multiple tracks together
 29 | - **Integration**: Add audio to videos, replace soundtracks
 30 | 
 31 | ### 📥 Download & Utilities
 32 | - **Video Download**: Download from YouTube and other platforms
 33 | - **File Management**: Directory operations, file listing
 34 | - **Path Suggestions**: Get recommended download locations
 35 | 
 36 | ### 🧹 Memory & Cleanup
 37 | - **Smart Memory**: Chain operations without saving intermediate files
 38 | - **Resource Management**: Clear memory, check stored objects
 39 | - **Efficient Processing**: Keep objects in memory for complex workflows
 40 | 
 41 | ### 🔗 Operation Chaining
 42 | Seamlessly chain multiple operations together without creating intermediate files. Process your video through multiple steps (trim → add audio → apply effects → add text) while keeping everything in memory for optimal performance.
 43 | 
 44 | ## 📋 Requirements
 45 | 
 46 | - **Python 3.10 or higher**
 47 | - **moviepy==1.0.3**
 48 | - **yt-dlp>=2023.1.6**
 49 | - **mcp>=1.12.2**
 50 | - **typing-extensions>=4.0.0**
 51 | 
 52 | ## ⚙️ Installation & Setup
 53 | 
 54 | 
 55 | 
 56 | ### For Claude Desktop / Cursor MCP Integration
 57 | 
 58 | **Ensure that `uv` is installed.**  
 59 | If not, install it using the following PowerShell command:
 60 | 
 61 | ```powershell
 62 | powershell -ExecutionPolicy Bypass -Command "irm https://astral.sh/uv/install.ps1 | iex"
 63 | ```
 64 | 
 65 | Add this configuration to your MCP configuration file:
 66 | 
 67 | ```json
 68 | {
 69 |   "mcpServers": {
 70 |     "video_editing": {
 71 |       "command": "uvx",
 72 |       "args": [
 73 |         "--python",
 74 |         "3.11",
 75 |         "video-edit-mcp"
 76 |       ]
 77 |     }
 78 |   }
 79 | }
 80 | ```
 81 | 
 82 | **Configuration file locations:**
 83 | - **Claude Desktop (Windows)**: `%APPDATA%/Claude/claude_desktop_config.json`
 84 | - **Claude Desktop (macOS)**: `~/Library/Application Support/Claude/claude_desktop_config.json`
 85 | - **Cursor**: `.cursor/mcp.json` in your project root
 86 | 
 87 | ### Manual Installation
 88 | 
 89 | ```bash
 90 | git clone https://github.com/Aditya2755/video-edit-mcp.git
 91 | cd video-edit-mcp
 92 | pip install -r requirements.txt
 93 | pip install -e .
 94 | ```
 95 | 
 96 | ## 🏗️ Project Structure
 97 | 
 98 | ```
 99 | video_edit_mcp/
100 | ├── src/
101 | │   └── video_edit_mcp/
102 | │       ├── __init__.py
103 | │       ├── main.py                 # MCP server implementation  
104 | │       ├── video_operations.py     # Video editing tools
105 | │       ├── audio_operations.py     # Audio processing tools
106 | │       ├── download_utils.py       # Download functionality
107 | │       ├── util_tools.py          # Memory & utility tools
108 | │       ├── utils.py               # Utility functions
109 | │     
110 | ├── pyproject.toml                 # Project configuration
111 | ├── requirements.txt               # Dependencies
112 | ├── uv.lock                        # Lock file
113 | ├── LICENSE                        # MIT License
114 | ├── MANIFEST.in                    # Manifest file
115 | └── README.md
116 | ```
117 | 
118 | ## 🎯 Example Usage
119 | 
120 | ```python
121 | # Chain operations without intermediate files
122 | video_info = get_video_info("input.mp4")
123 | trimmed = trim_video("input.mp4", 10, 60, return_path=False)  # Keep in memory
124 | with_audio = add_audio(trimmed, "background.mp3", return_path=False)  
125 | final = add_text_overlay(with_audio, "Hello World", x=100, y=50, return_path=True)
126 | ```
127 | 
128 | ## 🚀 Future Enhancements & Contributions
129 | 
130 | We welcome contributions in these exciting areas:
131 | 
132 | ### 🤖 AI-Powered Features
133 | - **Speech-to-Text (STT)**: Automatic subtitle generation and transcription
134 | - **Text-to-Speech (TTS)**: AI voice synthesis for narration
135 | - **Audio Enhancement**: AI-based noise reduction and audio quality improvement
136 | - **Smart Timestamps**: Automatic scene detection and chapter generation
137 | - **Face Tracking**: Advanced face detection and tracking for automatic editing
138 | - **Object Recognition**: Track and edit based on detected objects
139 | - **Content Analysis**: AI-powered content categorization and tagging
140 | 
141 | ## 🤝 Contributing
142 | 
143 | 1. Fork the repository
144 | 2. Create a feature branch: `git checkout -b feature/amazing-feature`
145 | 3. Commit your changes: `git commit -m 'Add amazing feature'`
146 | 4. Push to the branch: `git push origin feature/amazing-feature`
147 | 5. Open a Pull Request
148 | 
149 | ## 📄 License
150 | 
151 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
152 | 
153 | ---
154 | 
155 | <div align="center">
156 | 
157 | **Made with ❤️ for the AI and multimedia editing community**
158 | 
159 | [⭐ Star this project](https://github.com/Aditya2755/video-edit-mcp) | [🤝 Contribute](https://github.com/Aditya2755/video-edit-mcp/contribute) | [📖 Documentation](https://github.com/Aditya2755/video-edit-mcp#readme)
160 | 
161 | </div>
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | moviepy==1.0.3
2 | yt-dlp>=2023.1.6
3 | mcp>=1.12.2
4 | Pillow==9.5.0
5 | 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """Video Edit MCP Server - A powerful MCP server for video editing operations using MoviePy."""
2 | 
3 | __version__ = "0.1.1"
4 | 
5 | 
6 | from .main import mcp
7 | 
8 | __all__ = ["mcp"] 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/main.py:
--------------------------------------------------------------------------------

```python
 1 | from mcp.server.fastmcp import FastMCP
 2 | import logging
 3 | from .video_operations import register_video_tools
 4 | from .audio_operations import register_audio_tools
 5 | from .download_utils import register_download_and_utility_tools
 6 | from .util_tools import register_util_tools
 7 | # Configure logging
 8 | logging.basicConfig(level=logging.INFO)
 9 | logger = logging.getLogger(__name__)
10 | 
11 | mcp = FastMCP()
12 | 
13 | # Register all tools from different modules
14 | register_video_tools(mcp)
15 | register_audio_tools(mcp)
16 | register_download_and_utility_tools(mcp)
17 | register_util_tools(mcp)
18 | 
19 | def main():
20 |     """Entry point for the MCP server"""
21 |     mcp.run(transport="streamable-http")
22 | 
23 | if __name__ == "__main__":
24 |     main()
25 | 
26 | 
27 | 
28 | 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/utils.py:
--------------------------------------------------------------------------------

```python
 1 | # Simple cross-platform output directory helper
 2 | import os
 3 | from pathlib import Path
 4 | import uuid
 5 | from moviepy.editor import VideoFileClip, AudioFileClip
 6 | from PIL import Image, ImageDraw, ImageFont
 7 | import tempfile
 8 | import logging
 9 | 
10 | logger = logging.getLogger(__name__)
11 | 
12 | def get_output_path(filename: str) -> str:
13 |     """Get cross-platform output path for files"""
14 |     # Use environment variable if set, otherwise default to Downloads
15 |     output_dir = os.environ.get("VIDEO_MCP_OUTPUT_DIR", str(Path.home() / "Downloads" / "video_mcp_output"))
16 |     Path(output_dir).mkdir(parents=True, exist_ok=True)
17 |     return os.path.join(output_dir, filename)
18 | 
19 | class VideoStore:
20 |     _store = {}
21 | 
22 |     @classmethod
23 |     def store(cls, video_clip) -> str:
24 |         ref = str(uuid.uuid4())
25 |         cls._store[ref] = video_clip
26 |         return ref
27 | 
28 |     @classmethod
29 |     def load(cls, video_ref: str):
30 |         if video_ref in cls._store:
31 |             return cls._store[video_ref]
32 |         return VideoFileClip(video_ref)
33 |     
34 |     @classmethod
35 |     def clear(cls):
36 |         cls._store.clear()
37 |     
38 | class AudioStore:
39 |     _store = {}
40 | 
41 |     @classmethod
42 |     def store(cls, audio_clip) -> str:
43 |         ref = str(uuid.uuid4())
44 |         cls._store[ref] = audio_clip
45 |         return ref
46 |     
47 |     @classmethod
48 |     def load(cls, audio_ref: str):
49 |         if audio_ref in cls._store:
50 |             return cls._store[audio_ref]
51 |         return AudioFileClip(audio_ref)
52 |     
53 |     @classmethod
54 |     def clear(cls):
55 |         cls._store.clear()
56 | 
57 | 
58 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [build-system]
 2 | requires = ["hatchling"]
 3 | build-backend = "hatchling.build"
 4 | 
 5 | [project]
 6 | name = "video-edit-mcp"
 7 | version = "0.1.1"
 8 | description = "A powerful Model Context Protocol server for video editing operations using MoviePy"
 9 | readme = "README.md"
10 | license = {text = "MIT"}
11 | requires-python = ">=3.10"
12 | authors = [
13 |     {name = "Aditya_Barasiya", email = "[email protected]"},
14 | ]
15 | keywords = ["mcp", "video", "editing", "moviepy", "ai", "server", "claude", "video-processing"]
16 | classifiers = [
17 |     "Development Status :: 4 - Beta",
18 |     "Intended Audience :: Developers",
19 |     "License :: OSI Approved :: MIT License",
20 |     "Programming Language :: Python :: 3",
21 |     "Programming Language :: Python :: 3.10",
22 |     "Programming Language :: Python :: 3.11",
23 |     "Programming Language :: Python :: 3.12",
24 |     "Topic :: Software Development :: Libraries :: Python Modules",
25 |     "Topic :: Multimedia :: Video",
26 |     "Topic :: Scientific/Engineering :: Artificial Intelligence",
27 |     "Topic :: Software Development :: Libraries :: Application Frameworks",
28 | ]
29 | 
30 | dependencies = [
31 |     "mcp>=1.12.2",
32 |     "moviepy==1.0.3",
33 |     "yt-dlp>=2023.1.6",
34 |     "typing-extensions>=4.0.0",
35 |     "Pillow==9.5.0"
36 | ]
37 | 
38 | [project.optional-dependencies]
39 | dev = [
40 |     "pytest>=7.0",
41 |     "black>=22.0",
42 |     "flake8>=4.0",
43 |     "mypy>=1.0",
44 | ]
45 | 
46 | [project.urls]
47 | Homepage = "https://github.com/Aditya2755/video-edit-mcp"
48 | Documentation = "https://github.com/Aditya2755/video-edit-mcp#readme"
49 | Repository = "https://github.com/Aditya2755/video-edit-mcp.git"
50 | "Bug Tracker" = "https://github.com/Aditya2755/video-edit-mcp/issues"
51 | 
52 | [project.scripts]
53 | video-edit-mcp = "video_edit_mcp.main:main"
54 | 
55 | [tool.hatch.build.targets.wheel]
56 | packages = ["src/video_edit_mcp"]
57 | 
58 | [tool.hatch.build.targets.sdist]
59 | include = [
60 |     "/src",
61 |     "/README.md",
62 |     "/LICENSE",
63 |     "/requirements.txt",
64 | ]
65 | 
66 | [tool.black]
67 | line-length = 88
68 | target-version = ['py310'] # Corrected: Changed from 'py38' to 'py310'
69 | 
70 | [tool.mypy]
71 | python_version = "3.10" # Corrected: Changed from "3.8" to "3.10"
72 | warn_return_any = true
73 | warn_unused_configs = true
74 | disallow_untyped_defs = false
75 | 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/util_tools.py:
--------------------------------------------------------------------------------

```python
  1 | from typing import Dict, Any
  2 | from moviepy.editor import VideoFileClip, AudioFileClip
  3 | import os
  4 | import logging
  5 | from .utils import VideoStore, AudioStore
  6 | 
  7 | logger = logging.getLogger(__name__)
  8 | 
  9 | 
 10 | def register_util_tools(mcp):
 11 |     @mcp.tool(description="Use this tool to check what is stored in memory, like objects and etc.")
 12 |     def check_memory(store_type: str = "both") -> Dict[str, Any]:
 13 |         """
 14 |         Check what objects are stored in memory.
 15 |         
 16 |         Args:
 17 |             store_type: Type of store to check ("video", "audio", or "both")
 18 |         """
 19 |         try:
 20 |             if store_type.lower() == "video":
 21 |                 return {
 22 |                     "success": True,
 23 |                     "video_memory": VideoStore._store,
 24 |                     "video_count": len(VideoStore._store)
 25 |                 }
 26 |             elif store_type.lower() == "audio":
 27 |                 return {
 28 |                     "success": True,
 29 |                     "audio_memory": AudioStore._store,
 30 |                     "audio_count": len(AudioStore._store)
 31 |                 }
 32 |             else:  # both or any other value
 33 |                 return {
 34 |                     "success": True,
 35 |                     "video_memory": VideoStore._store,
 36 |                     "audio_memory": AudioStore._store,
 37 |                     "video_count": len(VideoStore._store),
 38 |                     "audio_count": len(AudioStore._store),
 39 |                     "total_objects": len(VideoStore._store) + len(AudioStore._store)
 40 |                 }
 41 |         except Exception as e:
 42 |             logger.error(f"Error checking memory: {e}")
 43 |             return {
 44 |                 "success": False,
 45 |                 "error": str(e),
 46 |                 "error_type": type(e).__name__,
 47 |                 "message": "Error checking memory"
 48 |             }
 49 | 
 50 |     @mcp.tool(description="Use this tool for clearing all stored video and audio objects from memory to free up space")
 51 |     def clear_memory(clear_videos:bool, clear_audios:bool) -> Dict[str,Any]:
 52 |         try:
 53 |             if clear_videos:
 54 |                 VideoStore.clear()
 55 |             if clear_audios:
 56 |                 AudioStore.clear()
 57 |             return {
 58 |                 "success": True,
 59 |                 "message": f"Memory cleared - Videos: {clear_videos}, Audios: {clear_audios}"
 60 |             }
 61 |         except Exception as e:
 62 |             logger.error(f"Error clearing memory: {e}")
 63 |             return {
 64 |                 "success": False,
 65 |                 "error": str(e),
 66 |                 "error_type": type(e).__name__,
 67 |                 "message": "Error clearing memory"
 68 |             }
 69 | 
 70 |     @mcp.tool(description="Use this tool for listing files in a directory, provide directory path")
 71 |     def list_files(directory_path: str) -> Dict[str, Any]:
 72 |         try:
 73 |             if not os.path.exists(directory_path):
 74 |                 return {
 75 |                     "success": False,
 76 |                     "error": "Directory does not exist",
 77 |                     "message": "Provide valid directory path"
 78 |                 }
 79 |             files = os.listdir(directory_path)
 80 |             return {
 81 |                 "success": True,
 82 |                 "files": files
 83 |             }
 84 |         except Exception as e:
 85 |             return {
 86 |                 "success": False,
 87 |                 "error": str(e),
 88 |                 "message": "Error listing files"
 89 |             }
 90 |         
 91 |     @mcp.tool(description="Use this tool to create a directory to store output files, make sure to provide accurate path")
 92 |     def make_directory(directory_path: str) -> Dict[str, Any]:
 93 |         try:
 94 |             os.makedirs(directory_path, exist_ok=True)
 95 |             return {
 96 |                 "success": True,
 97 |                 "message": "Directory created successfully"
 98 |             }
 99 |         except Exception as e:
100 |             return {
101 |                 "success": False,
102 |                 "error": str(e),
103 |                 "message": "Error creating directory"
104 |             } 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/download_utils.py:
--------------------------------------------------------------------------------

```python
  1 | import yt_dlp
  2 | import os
  3 | import logging
  4 | from typing import Dict, Any, Optional, List
  5 | from .utils import VideoStore, AudioStore
  6 | 
  7 | logger = logging.getLogger(__name__)
  8 | 
  9 | def get_default_download_paths():
 10 |     """Get common download directory paths"""
 11 |     user_home = os.path.expanduser("~")
 12 |     return {
 13 |         "user_downloads": os.path.join(user_home, "Downloads"),
 14 |         "desktop": os.path.join(user_home, "Desktop"), 
 15 |         "documents": os.path.join(user_home, "Documents"),
 16 |         "project_root": os.path.abspath(".") if "video_edit_mcp" in os.path.abspath(".") else None
 17 |     }
 18 | 
 19 | def register_download_and_utility_tools(mcp):
 20 |     """Register all download and utility tools with the MCP server"""
 21 |     @mcp.tool(description= "use this tool to download videos make sure to give proper path for saving video not just name, if there are multiple steps to be done after downloading then make sure to return object and return path should be false else return path should be true")
 22 |     def download_video(
 23 |         url: str,
 24 |         save_path: Optional[str] = None,
 25 |         audio_only: bool = False,
 26 |         **yt_dlp_options: Any
 27 |     ) -> Dict[str, Any]:
 28 |         """
 29 |         Download video or audio from URL using yt-dlp
 30 |         
 31 |         Args:
 32 |             url: URL to download from
 33 |             save_path: Directory path or full file path template where to save
 34 |             audio_only: If True, download and extract audio only
 35 |             **yt_dlp_options: Additional yt-dlp options
 36 |             
 37 |         Returns:
 38 |             Dict with success status, file path, and other info
 39 |         """
 40 |         try:
 41 |             # Start with user-provided yt-dlp options
 42 |             ydl_opts = yt_dlp_options.copy()
 43 |             
 44 |             # --- Handle Core Parameters ---
 45 |             
 46 |             # 1. Set output path template if not already specified by user
 47 |             if 'outtmpl' not in ydl_opts:
 48 |                 if save_path:
 49 |                     # Check if user provided a relative path that might cause confusion
 50 |                     if not os.path.isabs(save_path) and not save_path.startswith('.'):
 51 |                         logger.warning(f"Relative path detected: '{save_path}'. This will save relative to current directory: {os.getcwd()}")
 52 |                         
 53 |                     # Convert save_path to absolute path first
 54 |                     save_path = os.path.abspath(save_path)
 55 |                     
 56 |                     if os.path.isdir(save_path):
 57 |                         # save_path is a directory
 58 |                         ydl_opts['outtmpl'] = os.path.join(save_path, '%(title)s [%(id)s].%(ext)s')
 59 |                     else:
 60 |                         # save_path could be a full path or template
 61 |                         # Check if it contains yt-dlp template variables
 62 |                         if '%(' in save_path:
 63 |                             ydl_opts['outtmpl'] = save_path
 64 |                         else:
 65 |                             # Treat as full file path, ensure directory exists
 66 |                             directory = os.path.dirname(save_path)
 67 |                             if directory and not os.path.exists(directory):
 68 |                                 os.makedirs(directory, exist_ok=True)
 69 |                             ydl_opts['outtmpl'] = save_path
 70 |                 else:
 71 |                     # Default to user's Downloads folder instead of current directory
 72 |                     default_paths = get_default_download_paths()
 73 |                     downloads_dir = default_paths["user_downloads"]
 74 |                     
 75 |                     # Create Downloads directory if it doesn't exist
 76 |                     os.makedirs(downloads_dir, exist_ok=True)
 77 |                     ydl_opts['outtmpl'] = os.path.join(downloads_dir, '%(title)s [%(id)s].%(ext)s')
 78 | 
 79 |             # 2. Configure for audio-only downloads
 80 |             if audio_only:
 81 |                 # Set format to best audio
 82 |                 ydl_opts['format'] = 'bestaudio/best'
 83 |                 # Ensure post-processor is set to extract audio (defaults to mp3)
 84 |                 if 'postprocessors' not in ydl_opts:
 85 |                     ydl_opts['postprocessors'] = []
 86 |                 
 87 |                 # Add audio extraction post-processor
 88 |                 audio_postprocessor = {
 89 |                     'key': 'FFmpegExtractAudio',
 90 |                     'preferredcodec': ydl_opts.get('audio_format', 'mp3'),
 91 |                     'preferredquality': ydl_opts.get('audio_quality', '192'),
 92 |                 }
 93 |                 ydl_opts['postprocessors'].append(audio_postprocessor)
 94 | 
 95 |             # Add logger
 96 |             ydl_opts['logger'] = logger
 97 | 
 98 |             # --- Execute Download ---
 99 |             with yt_dlp.YoutubeDL(ydl_opts) as ydl:
100 |                 # Extract info and download
101 |                 info = ydl.extract_info(url, download=not ydl_opts.get('simulate', False))
102 |                 
103 |                 # Prepare response
104 |                 response = {
105 |                     "success": True,
106 |                     "message": "Operation successful!",
107 |                     "title": info.get('title', 'Unknown'),
108 |                     "id": info.get('id', 'Unknown'),
109 |                     "duration": info.get('duration'),
110 |                     "uploader": info.get('uploader')
111 |                 }
112 |                 
113 |                 # If not simulating, get the actual file path
114 |                 if not ydl_opts.get('simulate', False):
115 |                     # Get the base filename that yt-dlp would use
116 |                     base_filepath = ydl.prepare_filename(info)
117 |                     
118 |                     # Convert to absolute path
119 |                     base_filepath = os.path.abspath(base_filepath)
120 |                     
121 |                     # For audio-only downloads, the file extension will change
122 |                     if audio_only:
123 |                         # Find the audio codec used
124 |                         audio_codec = ydl_opts.get('audio_format', 'mp3')
125 |                         # Change extension to match the extracted audio format
126 |                         base_name = os.path.splitext(base_filepath)[0]
127 |                         actual_filepath = f"{base_name}.{audio_codec}"
128 |                     else:
129 |                         actual_filepath = base_filepath
130 |                     
131 |                     # Ensure we have absolute path
132 |                     actual_filepath = os.path.abspath(actual_filepath)
133 |                     
134 |                     # Verify the file exists
135 |                     if os.path.exists(actual_filepath):
136 |                         response.update({
137 |                             "filepath": actual_filepath,
138 |                             "filename": os.path.basename(actual_filepath),
139 |                             "directory": os.path.dirname(actual_filepath),
140 |                             "file_size": os.path.getsize(actual_filepath)
141 |                         })
142 |                     else:
143 |                         # If exact path doesn't exist, try to find the actual file
144 |                         # This handles cases where yt-dlp might have modified the filename
145 |                         directory = os.path.dirname(actual_filepath)
146 |                         base_name = os.path.splitext(os.path.basename(actual_filepath))[0]
147 |                         
148 |                         # Look for files with similar names in the directory
149 |                         if os.path.exists(directory):
150 |                             found_file = None
151 |                             for file in os.listdir(directory):
152 |                                 if base_name in file or info.get('id', '') in file:
153 |                                     found_file = file
154 |                                     break
155 |                             
156 |                             if found_file:
157 |                                 found_filepath = os.path.abspath(os.path.join(directory, found_file))
158 |                                 response.update({
159 |                                     "filepath": found_filepath,
160 |                                     "filename": found_file,
161 |                                     "directory": os.path.dirname(found_filepath),
162 |                                     "file_size": os.path.getsize(found_filepath),
163 |                                     "note": "Filename was modified during download"
164 |                                 })
165 |                             else:
166 |                                 response.update({
167 |                                     "filepath": actual_filepath,
168 |                                     "filename": os.path.basename(actual_filepath),
169 |                                     "directory": os.path.dirname(actual_filepath),
170 |                                     "warning": "File path expected but not found at exact location",
171 |                                     "expected_path": actual_filepath
172 |                                 })
173 |                         else:
174 |                             response.update({
175 |                                 "filepath": actual_filepath,
176 |                                 "filename": os.path.basename(actual_filepath),
177 |                                 "directory": os.path.dirname(actual_filepath),
178 |                                 "warning": "Directory not found",
179 |                                 "expected_path": actual_filepath
180 |                             })
181 |                 else:
182 |                     # Simulation mode
183 |                     simulated_path = ydl.prepare_filename(info)
184 |                     response["simulated_filepath"] = os.path.abspath(simulated_path)
185 |                     response["message"] = "Simulation successful - no file downloaded"
186 | 
187 |                 return response
188 |                 
189 |         except Exception as e:
190 |             logger.error(f"Error during video download from {url}: {e}", exc_info=True)
191 |             return {
192 |                 "success": False, 
193 |                 "error": str(e), 
194 |                 "error_type": type(e).__name__,
195 |                 "url": url,
196 |                 "message": "Download failed"
197 |             }
198 | 
199 |     @mcp.tool(description="Get suggested download directory paths for saving videos/audio files")
200 |     def get_download_paths() -> Dict[str, Any]:
201 |         """
202 |         Get suggested common download directory paths
203 |         
204 |         Returns:
205 |             Dict with suggested download paths
206 |         """
207 |         try:
208 |             paths = get_default_download_paths()
209 |             current_dir = os.getcwd()
210 |             
211 |             return {
212 |                 "success": True,
213 |                 "current_working_directory": current_dir,
214 |                 "suggested_paths": {
215 |                     "user_downloads": paths["user_downloads"],
216 |                     "desktop": paths["desktop"],
217 |                     "documents": paths["documents"],
218 |                     "project_root": paths["project_root"],
219 |                     "current_directory": current_dir
220 |                 },
221 |                 "usage_examples": {
222 |                     "save_to_downloads": paths["user_downloads"],
223 |                     "save_to_custom_folder": os.path.join(paths["user_downloads"], "YouTube_Videos"),
224 |                     "save_to_desktop": paths["desktop"]
225 |                 },
226 |                 "note": "Use full absolute paths to avoid saving in unexpected locations"
227 |             }
228 |         except Exception as e:
229 |             logger.error(f"Error getting download paths: {e}")
230 |             return {
231 |                 "success": False,
232 |                 "error": str(e),
233 |                 "current_directory": os.getcwd(),
234 |                 "message": "Error getting download paths"
235 |             }
236 | 
237 |     
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/audio_operations.py:
--------------------------------------------------------------------------------

```python
  1 | from moviepy.editor import *
  2 | from moviepy.audio.fx.all import audio_loop
  3 | from moviepy.editor import CompositeAudioClip
  4 | from typing import Dict, Any, List
  5 | import os
  6 | import logging
  7 | from .utils import get_output_path, AudioStore
  8 | 
  9 | logger = logging.getLogger(__name__)
 10 | 
 11 | def register_audio_tools(mcp):
 12 |     """Register all audio processing tools with the MCP server"""
 13 |     
 14 |     @mcp.tool(description="get audio info")
 15 |     def audio_info(audio_path:str) -> Dict[str,Any]:
 16 |         try:
 17 |             
 18 |             audio = AudioFileClip(audio_path)
 19 |             return{
 20 |                 "success": True,
 21 |                 "audio_info": {
 22 |                     "duration": audio.duration,
 23 |                     "fps": audio.fps,
 24 |                     "channels": audio.nchannels
 25 |                 }
 26 |             }
 27 |         except Exception as e:
 28 |             logger.error(f"Error getting audio info for {audio_path}: {e}")
 29 |             return {
 30 |                 "success": False,
 31 |                 "error": str(e),
 32 |                 "error_type": type(e).__name__
 33 |             }
 34 | 
 35 |     @mcp.tool(description="Use this tool for extracting audio from the video , and make sure only give output name like extracted_audio.mp3 , some_hello.mp3 etc.. don't pass path just give meaningful names based on audio info, if there are multiple steps to be done after extracting audio then make sure to return object and return path should be false else return path should be true")
 36 |     def extract_audio(video_path: str, output_name: str, return_path: bool) -> Dict[str, Any]:
 37 |         try:
 38 |             output_path = get_output_path(output_name)
 39 |             from .utils import VideoStore
 40 |             video = VideoStore.load(video_path)
 41 |             
 42 |             if video.audio is None:
 43 |                 return {
 44 |                     "success": False,
 45 |                     "error": "Video has no audio track",
 46 |                     "message": "No audio to extract"
 47 |                 }
 48 |             
 49 |             audio = video.audio
 50 |             if return_path:
 51 |                 audio.write_audiofile(output_path)
 52 |                 return {
 53 |                     "success": True,
 54 |                     "output_path": output_path,
 55 |                     "message": "Audio extracted successfully"
 56 |                 }
 57 |             else:
 58 |                 ref = AudioStore.store(audio)
 59 |                 return {
 60 |                     "success": True,
 61 |                     "output_object": ref
 62 |                 }
 63 |         except Exception as e:
 64 |             logger.error(f"Error extracting audio from {video_path}: {e}")
 65 |             return {
 66 |                 "success": False,
 67 |                 "error": str(e),
 68 |                 "error_type": type(e).__name__,
 69 |                 "message": "Error extracting audio"
 70 |             }
 71 | 
 72 |     @mcp.tool(description="Trim audio file, make sure to provide proper start and end time, and make sure to provide output name like trimmed_audio.mp3, some_hello.mp3 etc. don't pass path just give meaningful names based on audio info, if there are multiple steps to be done after trimming then make sure to return object and return path should be false else return path should be true")
 73 |     def trim_audio(audio_path: str, start_time: float, end_time: float, output_name: str, return_path: bool) -> Dict[str, Any]:
 74 |         try:
 75 |             # Input validation
 76 |             if start_time < 0 or end_time < 0:
 77 |                 return {
 78 |                     "success": False,
 79 |                     "error": "Start and end times must be positive",
 80 |                     "message": "Invalid time parameters"
 81 |                 }
 82 |             if start_time >= end_time:
 83 |                 return {
 84 |                     "success": False,
 85 |                     "error": "Start time must be less than end time",
 86 |                     "message": "Invalid time range"
 87 |                 }
 88 |             
 89 |             output_path = get_output_path(output_name)
 90 |             audio = AudioStore.load(audio_path)
 91 |             trimmed_audio = audio.subclip(start_time, end_time)
 92 |             
 93 |             if return_path:
 94 |                 trimmed_audio.write_audiofile(output_path)
 95 |                 return {
 96 |                     "success": True,
 97 |                     "output_path": output_path,
 98 |                     "message": "Audio trimmed successfully"
 99 |                 }
100 |             else:
101 |                 ref = AudioStore.store(trimmed_audio)
102 |                 return {
103 |                     "success": True,
104 |                     "output_object": ref,
105 |                     "message": "Audio trimmed successfully"
106 |                 }
107 |         except Exception as e:
108 |             logger.error(f"Error trimming audio {audio_path}: {e}")
109 |             return {
110 |                 "success": False,
111 |                 "error": str(e),
112 |                 "error_type": type(e).__name__,
113 |                 "message": "Error trimming audio"
114 |             }
115 | 
116 |     @mcp.tool(description="Use this tool to concatenate two audios, if there are multiple steps to be done after concatenating then make sure to return object and return path should be false else return path should be true")
117 |     def concatenate_audio(audio_path_1: str, audio_path_2: str, output_name: str, return_path: bool) -> Dict[str, Any]:
118 |         try:
119 |             output_path = get_output_path(output_name)
120 |             audio_1 = AudioStore.load(audio_path_1)
121 |             audio_2 = AudioStore.load(audio_path_2)
122 |             concatenated_audio = concatenate_audioclips([audio_1, audio_2])
123 |             
124 |             if return_path:
125 |                 concatenated_audio.write_audiofile(output_path)
126 |                 return {
127 |                     "success": True,
128 |                     "output_path": output_path,
129 |                     "message": "Audio concatenated successfully"
130 |                 }
131 |             else:
132 |                 ref = AudioStore.store(concatenated_audio)
133 |                 return {
134 |                     "success": True,
135 |                     "output_object": ref,
136 |                     "message": "Audio concatenated successfully"
137 |                 }
138 |         except Exception as e:
139 |             logger.error(f"Error concatenating audio files {audio_path_1} and {audio_path_2}: {e}")
140 |             return {
141 |                 "success": False,
142 |                 "error": str(e),
143 |                 "error_type": type(e).__name__,
144 |                 "message": "Error concatenating audio files"
145 |             }
146 | 
147 |     @mcp.tool(description="Use this tool to loop the audio, after looping make sure to provide output name like looped_audio.mp3, some_hello.mp3 etc. don't pass path just give meaningful names based on audio info and also make sure to provide duration in seconds, if there are multiple steps to be done after looping then make sure to return object and return path should be false else return path should be true")
148 |     def loop_audio(audio_path: str, duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
149 |         try:
150 |             # Input validation
151 |             if duration <= 0:
152 |                 return {
153 |                     "success": False,
154 |                     "error": "Duration must be positive",
155 |                     "message": "Invalid duration parameter"
156 |                 }
157 |             
158 |             output_path = get_output_path(output_name)
159 |             audio = AudioStore.load(audio_path)
160 |             looped_audio = audio_loop(audio, duration=duration)
161 |             
162 |             if return_path:
163 |                 looped_audio.write_audiofile(output_path)
164 |                 return {
165 |                     "success": True,
166 |                     "output_path": output_path,
167 |                     "message": "Audio looped successfully"
168 |                 }
169 |             else:
170 |                 ref = AudioStore.store(looped_audio)
171 |                 return {
172 |                     "success": True,
173 |                     "output_object": ref,
174 |                     "message": "Audio looped successfully"
175 |                 }
176 |         except Exception as e:
177 |             logger.error(f"Error looping audio {audio_path}: {e}")
178 |             return {
179 |                 "success": False,
180 |                 "error": str(e),
181 |                 "error_type": type(e).__name__,
182 |                 "message": "Error looping audio"
183 |             }
184 | 
185 |     @mcp.tool(description="Adjust volume of an audio, if there are multiple steps to be done after adjusting volume then make sure to return object and return path should be false else return path should be true")
186 |     def adjust_vol(audio_path: str, volume_level: float, output_name: str, return_path: bool) -> Dict[str, Any]:
187 |         try:
188 |             # Input validation
189 |             if volume_level <= 0:
190 |                 return {
191 |                     "success": False,
192 |                     "error": "Volume level must be positive (e.g., 1.0 for normal, 2.0 for double)",
193 |                     "message": "Invalid volume level parameter"
194 |                 }
195 |             
196 |             output_path = get_output_path(output_name)
197 |             audio = AudioStore.load(audio_path)
198 |             audio_adjusted = audio.volumex(volume_level)
199 |             
200 |             if return_path:
201 |                 audio_adjusted.write_audiofile(output_path)
202 |                 return {
203 |                     "success": True,
204 |                     "output_path": output_path,
205 |                     "message": "Audio volume adjusted successfully"
206 |                 }
207 |             else:
208 |                 ref = AudioStore.store(audio_adjusted)
209 |                 return {
210 |                     "success": True,
211 |                     "output_object": ref,
212 |                     "message": "Audio volume adjusted successfully"
213 |                 }
214 |         except Exception as e:
215 |             logger.error(f"Error adjusting audio volume {audio_path}: {e}")
216 |             return {
217 |                 "success": False,
218 |                 "error": str(e),
219 |                 "error_type": type(e).__name__,
220 |                 "message": "Error adjusting audio volume"
221 |             }
222 | 
223 |     @mcp.tool(description="Use this tool for audio fade in effect, provide fade_duration in seconds, and output name like fadein_audio.mp3, if there are multiple steps to be done after adding fade in then make sure to return object and return path should be false else return path should be true")
224 |     def fadein_audio(audio_path:str, fade_duration:float, output_name:str, return_path:bool) -> Dict[str,Any]:
225 |         try:
226 |             output_path = get_output_path(output_name)
227 |             audio = AudioStore.load(audio_path)
228 |             fadein_audio = audio.audio_fadein(fade_duration)
229 |             if return_path:
230 |                 fadein_audio.write_audiofile(output_path)
231 |                 return {
232 |                     "success": True,
233 |                     "output_path": output_path,
234 |                     "message": "Audio fade in effect added successfully"
235 |                 }
236 |             else:
237 |                 ref = AudioStore.store(fadein_audio)
238 |                 return {
239 |                     "success": True,
240 |                     "output_object": ref
241 |                 }
242 |         except Exception as e:
243 |             logger.error(f"Error adding audio fade in effect {audio_path}: {e}")
244 |             return {
245 |                 "success": False,
246 |                 "error": str(e),
247 |                 "error_type": type(e).__name__,
248 |                 "message": "Error adding audio fade in effect"
249 |             }
250 | 
251 |     @mcp.tool(description="Use this tool for audio fade out effect, provide fade_duration in seconds, and output name like fadeout_audio.mp3, if there are multiple steps to be done after adding fade out then make sure to return object and return path should be false else return path should be true")
252 |     def fadeout_audio(audio_path:str, fade_duration:float, output_name:str, return_path:bool) -> Dict[str,Any]:
253 |         try:
254 |             output_path = get_output_path(output_name)
255 |             audio = AudioStore.load(audio_path)
256 |             fadeout_audio = audio.audio_fadeout(fade_duration)
257 |             if return_path:
258 |                 fadeout_audio.write_audiofile(output_path)
259 |                 return {
260 |                     "success": True,
261 |                     "output_path": output_path,
262 |                     "message": "Audio fade out effect added successfully"
263 |                 }
264 |             else:
265 |                 ref = AudioStore.store(fadeout_audio)
266 |                 return {
267 |                     "success": True,
268 |                     "output_object": ref
269 |                 }
270 |         except Exception as e:
271 |             logger.error(f"Error adding audio fade out effect {audio_path}: {e}")
272 |             return {
273 |                 "success": False,
274 |                 "error": str(e),
275 |                 "error_type": type(e).__name__,
276 |                 "message": "Error adding audio fade out effect"
277 |             }
278 | 
279 |     @mcp.tool(description="Use this tool for mixing multiple audio tracks together, provide list of audio paths and output name, if there are multiple steps to be done after mixing audio tracks then make sure to return object and return path should be false else return path should be true")
280 |     def mix_audio_tracks(audio_paths:List[str], output_name:str, return_path:bool) -> Dict[str,Any]:
281 |         try:
282 |             output_path = get_output_path(output_name)
283 |             audio_clips = [AudioStore.load(clips) for clips in audio_paths]
284 |             mixed_audio = CompositeAudioClip(audio_clips)
285 |             mixed_audio.fps = 44100
286 |             if return_path:
287 |                     try:
288 |                         mixed_audio.write_audiofile(output_path)
289 |                         return {
290 |                         "success": True,
291 |                         "output_path": output_path,
292 |                         "message": "Audio tracks mixed successfully"
293 |                     }
294 |                     except Exception as write_error:
295 |                         return {
296 |                         "success": False,
297 |                         "error": f"Failed to write file: {str(write_error)}",
298 |                         "message": "File writing failed"
299 |                         }
300 |             else:
301 |                     return {
302 |                     "success": True,
303 |                     "output_object": mixed_audio
304 |                     }
305 | 
306 |         except Exception as e:
307 |             return {
308 |                 "success": False,
309 |                 "error": f"Unexpected error: {str(e)}",
310 |                 "error_type": type(e).__name__,
311 |                 "message": "Unexpected error occurred"
312 |             } 
```

--------------------------------------------------------------------------------
/src/video_edit_mcp/video_operations.py:
--------------------------------------------------------------------------------

```python
  1 | from moviepy.editor import *
  2 | from moviepy.video.fx import *
  3 | from moviepy.video.fx.speedx import speedx
  4 | from moviepy.video.fx.rotate import rotate
  5 | from moviepy.video.fx.crop import crop
  6 | from moviepy.video.fx.resize import resize
  7 | from moviepy.video.fx.fadein import fadein
  8 | from moviepy.video.fx.fadeout import fadeout
  9 | from moviepy.video.fx.blackwhite import blackwhite
 10 | from moviepy.video.fx.mirror_x import mirror_x
 11 | from moviepy.editor import ImageClip, CompositeVideoClip, ImageSequenceClip, TextClip
 12 | from typing import Dict, Any, Optional, List, Tuple
 13 | import os
 14 | import logging
 15 | import imageio
 16 | from .utils import get_output_path, VideoStore, AudioStore
 17 | import moviepy.config as mpy_conf
 18 | 
 19 | 
 20 | logger = logging.getLogger(__name__)
 21 | 
 22 | def register_video_tools(mcp):
 23 |     """Register all video processing tools with the MCP server"""
 24 |     
 25 |     @mcp.tool()
 26 |     def get_video_info(video_path: str) -> Dict[str, Any]:
 27 |         """Get comprehensive information about a video file including duration, fps, resolution, codec details, and audio information."""
 28 |         try:
 29 |             # Load video file
 30 |             video = VideoFileClip(video_path)
 31 |             
 32 |             # Basic video information
 33 |             info = {
 34 |                 "file_path": video_path,
 35 |                 "filename": os.path.basename(video_path),
 36 |                 "duration": video.duration,
 37 |                 "fps": video.fps,
 38 |                 "size": video.size,  # (width, height)
 39 |                 "width": video.w,
 40 |                 "height": video.h,
 41 |                 "aspect_ratio": round(video.w / video.h, 2) if video.h > 0 else None,
 42 |             }
 43 |             
 44 |             # Add reader/codec information if available
 45 |             if hasattr(video, 'reader') and video.reader:
 46 |                 reader_info = {
 47 |                     "nframes": getattr(video.reader, 'nframes', None),
 48 |                     "bitrate": getattr(video.reader, 'bitrate', None),
 49 |                     "codec": getattr(video.reader, 'codec', None),
 50 |                     "pix_fmt": getattr(video.reader, 'pix_fmt', None),
 51 |                 }
 52 |                 # Only add non-None values
 53 |                 info.update({k: v for k, v in reader_info.items() if v is not None})
 54 |             
 55 |             # Audio information
 56 |             if video.audio is not None:
 57 |                 audio_info = {
 58 |                     "has_audio": True,
 59 |                     "audio_duration": video.audio.duration,
 60 |                     "audio_fps": video.audio.fps,
 61 |                     "audio_channels": getattr(video.audio, 'nchannels', None),
 62 |                 }
 63 |                 
 64 |                 # Add audio reader info if available
 65 |                 if hasattr(video.audio, 'reader') and video.audio.reader:
 66 |                     audio_reader_info = {
 67 |                         "audio_bitrate": getattr(video.audio.reader, 'bitrate', None),
 68 |                         "audio_codec": getattr(video.audio.reader, 'codec', None),
 69 |                         "sample_rate": getattr(video.audio.reader, 'fps', None),
 70 |                     }
 71 |                     # Only add non-None values
 72 |                     audio_info.update({k: v for k, v in audio_reader_info.items() if v is not None})
 73 |             else:
 74 |                 audio_info = {
 75 |                     "has_audio": False,
 76 |                     "audio_duration": None,
 77 |                     "audio_fps": None,
 78 |                     "audio_channels": None,
 79 |                 }
 80 |             
 81 |             info.update(audio_info)
 82 |             
 83 |             # File size information
 84 |             try:
 85 |                 file_size = os.path.getsize(video_path)
 86 |                 info["file_size_bytes"] = file_size
 87 |                 info["file_size_mb"] = round(file_size / (1024 * 1024), 2)
 88 |             except OSError:
 89 |                 info["file_size_bytes"] = None
 90 |                 info["file_size_mb"] = None
 91 |             
 92 |             # Calculate video quality metrics
 93 |             if info["duration"] and info["duration"] > 0:
 94 |                 info["total_frames"] = int(info["fps"] * info["duration"]) if info["fps"] else None
 95 |                 if info.get("file_size_bytes"):
 96 |                     info["average_bitrate_kbps"] = round((info["file_size_bytes"] * 8) / (info["duration"] * 1000), 2)
 97 |             
 98 |             # Clean up video object to prevent memory leaks
 99 |             video.close()
100 |             
101 |             return {
102 |                 "success": True,
103 |                 "video_info": info
104 |             }
105 |             
106 |         except Exception as e:
107 |             logger.error(f"Error getting video info for {video_path}: {e}")
108 |             return {
109 |                 "success": False,
110 |                 "error": str(e),
111 |                 "error_type": type(e).__name__
112 |             }
113 |         finally:
114 |             # Ensure video object is cleaned up even if an exception occurs
115 |             try:
116 |                 if 'video' in locals():
117 |                     video.close()
118 |             except:
119 |                 pass
120 | 
121 |     @mcp.tool(description="Use this tool for trimming the video, provide start and end time in seconds, and output name like trimmed_video.mp4 , if there are multiple steps to be done after trimming then make sure to return object and return path should be false else return path should be true")
122 |     def trim_video(video_path: str, start_time: float, end_time: float, output_name: str, return_path: bool) -> Dict[str, Any]:
123 |         try:
124 |             # Input validation
125 |             if start_time < 0 or end_time < 0:
126 |                 return {
127 |                     "success": False,
128 |                     "error": "Start and end times must be positive",
129 |                     "message": "Invalid time parameters"
130 |                 }
131 |             if start_time >= end_time:
132 |                 return {
133 |                     "success": False,
134 |                     "error": "Start time must be less than end time",
135 |                     "message": "Invalid time range"
136 |                 }
137 |             
138 |             output_path = get_output_path(output_name)
139 |             video = VideoStore.load(video_path)
140 |             trimmed_video = video.subclip(start_time, end_time)
141 | 
142 |             if return_path:
143 |                 trimmed_video.write_videofile(output_path)
144 |                 return {
145 |                     "success": True,
146 |                     "output_path": output_path,
147 |                     "message": "Video trimmed successfully"
148 |                 }
149 |             else:
150 |                 ref = VideoStore.store(trimmed_video)
151 |                 return {
152 |                     "success": True,
153 |                     "output_object": ref
154 |                 }
155 |         except Exception as e:
156 |             logger.error(f"Error trimming video {video_path}: {e}")
157 |             return {
158 |                 "success": False,
159 |                 "error": str(e),
160 |                 "error_type": type(e).__name__,
161 |                 "message": "Error trimming video"
162 |             }
163 | 
164 |     @mcp.tool(description="Use this tool for merging two videos, provide two video paths, and output name like merged_video.mp4 , if there are multiple steps to be done after merging then make sure to return object and return path should be false else return path should be true")
165 |     def merge_video(video_path: str, video_path2: str, output_name: str, return_path: bool) -> Dict[str, Any]:
166 |         """Merge two videos into one."""
167 |         try:
168 |             output_path = get_output_path(output_name)
169 |             video1 = VideoStore.load(video_path)
170 |             video2 = VideoStore.load(video_path2)
171 |             merged_video = concatenate_videoclips([video1, video2])
172 |             if return_path:
173 |                 merged_video.write_videofile(output_path)
174 |                 return {
175 |                     "success": True,
176 |                     "output_path": output_path,
177 |                     "message": "Videos merged successfully"
178 |                 }
179 |             else:
180 |                 ref = VideoStore.store(merged_video)
181 |                 return {
182 |                     "success": True,
183 |                     "output_object": ref,
184 |                     "message": "Videos merged successfully"
185 |                 }
186 |         except Exception as e:
187 |             logger.error(f"Error merging videos {video_path} and {video_path2}: {e}")
188 |             return {
189 |                 "success": False,
190 |                 "error": str(e),
191 |                 "error_type": type(e).__name__,
192 |                 "message": "Error merging videos"
193 |             }
194 | 
195 |  
196 | 
197 |     @mcp.tool(description="Use this tool for resizing the video make sure first whether video needs to be saved directly or just object has to be returned for further processing, if there are multiple steps to be done after resizing then make sure to return object and return path should be false else return path should be true")
198 |     def resize_video(video_path: str, size: Tuple[int, int], output_name: str, return_path: bool) -> Dict[str, Any]:
199 |         try:
200 |             # Input validation
201 |             if not size or len(size) != 2 or size[0] <= 0 or size[1] <= 0:
202 |                 return {
203 |                     "success": False,
204 |                     "error": "Size must be a tuple of two positive integers (width, height)",
205 |                     "message": "Invalid size parameters"
206 |                 }
207 |             
208 |             output_path = get_output_path(output_name)
209 |             video = VideoStore.load(video_path)
210 |             resized_video = video.resize(newsize=size)
211 |             
212 |             if return_path:
213 |                 resized_video.write_videofile(output_path)
214 |                 return {
215 |                     "success": True,
216 |                     "output_path": output_path,
217 |                     "message": "Video resized successfully"
218 |                 }
219 |             else:
220 |                 ref = VideoStore.store(resized_video)
221 |                 return {
222 |                     "success": True,
223 |                     "output_object": ref
224 |                 }
225 |         except Exception as e:
226 |             logger.error(f"Error resizing video {video_path}: {e}")
227 |             return {
228 |                 "success": False,
229 |                 "error": str(e),
230 |                 "error_type": type(e).__name__,
231 |                 "message": "Error resizing video"
232 |             }
233 | 
234 |     @mcp.tool(description="Use this tool for cropping the video, provide x1, y1, x2, y2 coordinates, and output name like cropped_video.mp4 , if there are multiple steps to be done after cropping then make sure to return object and return path should be false else return path should be true")
235 |     def crop_video(video_path: str, x1: int, y1: int, x2: int, y2: int, output_name: str, return_path: bool) -> Dict[str, Any]:
236 |         try:
237 |             # Input validation
238 |             if x1 < 0 or y1 < 0 or x2 <= x1 or y2 <= y1:
239 |                 return {
240 |                     "success": False,
241 |                     "error": "Invalid crop coordinates. x2 > x1 and y2 > y1, all values must be non-negative",
242 |                     "message": "Invalid crop parameters"
243 |                 }
244 |             
245 |             output_path = get_output_path(output_name)
246 |             video = VideoStore.load(video_path)
247 |             cropped_video = crop(video, x1, y1, x2, y2)
248 |             
249 |             if return_path:
250 |                 cropped_video.write_videofile(output_path)
251 |                 return {
252 |                     "success": True,
253 |                     "output_path": output_path,
254 |                     "message": "Video cropped successfully"
255 |                 }
256 |             else:
257 |                 ref = VideoStore.store(cropped_video)
258 |                 return {
259 |                     "success": True,
260 |                     "output_object": ref
261 |                 }
262 |         except Exception as e:
263 |             logger.error(f"Error cropping video {video_path}: {e}")
264 |             return {
265 |                 "success": False,
266 |                 "error": str(e),
267 |                 "error_type": type(e).__name__,
268 |                 "message": "Error cropping video"
269 |             }
270 | 
271 |     @mcp.tool(description="Use this tool for rotating the video, and make sure to provide output name like rotated_video.mp4 , some_hello.mp4 etc. don't pass path just give meaningful names based on video info, if there are multiple steps to be done after rotating then make sure to return object and return path should be false else return path should be true")
272 |     def rotate_video(video_path: str, angle: int, output_name: str, return_path: bool) -> Dict[str, Any]:
273 |         try:
274 |             # Input validation
275 |             if not isinstance(angle, (int, float)):
276 |                 return {
277 |                     "success": False,
278 |                     "error": "Angle must be a number",
279 |                     "message": "Invalid angle parameter"
280 |                 }
281 |             
282 |             output_path = get_output_path(output_name)
283 |             video = VideoStore.load(video_path)
284 |             rotated_video = rotate(video, angle)
285 |             
286 |             if return_path:
287 |                 rotated_video.write_videofile(output_path)
288 |                 return {
289 |                     "success": True,
290 |                     "output_path": output_path,
291 |                     "message": "Video rotated successfully"
292 |                 }
293 |             else:
294 |                 ref = VideoStore.store(rotated_video)
295 |                 return {
296 |                     "success": True,
297 |                     "output_object": ref
298 |                 }
299 |         except Exception as e:
300 |             logger.error(f"Error rotating video {video_path}: {e}")
301 |             return {
302 |                 "success": False,
303 |                 "error": str(e),
304 |                 "error_type": type(e).__name__,
305 |                 "message": "Error rotating video"
306 |             }
307 | 
308 |     @mcp.tool(description="Use this tool for speed up the video, and make sure to provide output name like speed_up_video.mp4 , some_hello.mp4 etc. don't pass path just give meaningful names based on video info, if there are multiple steps to be done after speed up then make sure to return object and return path should be false else return path should be true")
309 |     def speed_up_video(video_path: str, speed: float, output_name: str, return_path: bool) -> Dict[str, Any]:
310 |         try:
311 |             # Input validation
312 |             if speed <= 0:
313 |                 return {
314 |                     "success": False,
315 |                     "error": "Speed must be positive (e.g., 2.0 for 2x speed)",
316 |                     "message": "Invalid speed parameter"
317 |                 }
318 |             
319 |             output_path = get_output_path(output_name)
320 |             video = VideoStore.load(video_path)
321 |             sped_up_video = speedx(video, speed)
322 | 
323 |             if return_path:
324 |                 sped_up_video.write_videofile(output_path)
325 |                 return {
326 |                     "success": True,
327 |                     "output_path": output_path,
328 |                     "message": "Video speed changed successfully"
329 |                 }
330 |             else:
331 |                 ref = VideoStore.store(sped_up_video)
332 |                 return {
333 |                     "success": True,
334 |                     "output_object": ref
335 |                 }
336 |         except Exception as e:
337 |             logger.error(f"Error changing video speed {video_path}: {e}")
338 |             return {
339 |                 "success": False,
340 |                 "error": str(e),
341 |                 "error_type": type(e).__name__,
342 |                 "message": "Error changing video speed"
343 |             }
344 | 
345 |     @mcp.tool(description="Use this tool for adding audio to the video , and make sure to provide output file name like added_audio.mp4 etc. make sure mp4 extension is provided and name should be meaningful, if there are multiple steps to be done after adding audio then make sure to return object and return path should be false else return path should be true")
346 |     def add_audio(video_path: str, audio_path: str, output_name: str, return_path: bool) -> Dict[str, Any]:
347 |         try:
348 |             output_path = get_output_path(output_name)
349 |             video = VideoStore.load(video_path)
350 |             audio = AudioStore.load(audio_path)
351 |             new_video = video.set_audio(audio)
352 |             
353 |             if return_path:
354 |                 new_video.write_videofile(output_path)
355 |                 return {
356 |                     "success": True,
357 |                     "output_path": output_path,
358 |                     "message": "Audio added successfully"
359 |                 }
360 |             else:
361 |                 ref = VideoStore.store(new_video)
362 |                 return {
363 |                     "success": True,
364 |                     "output_object": ref
365 |                 }
366 |         except Exception as e:
367 |             logger.error(f"Error adding audio to video {video_path}: {e}")
368 |             return {
369 |                 "success": False,
370 |                 "error": str(e),
371 |                 "error_type": type(e).__name__,
372 |                 "message": "Error adding audio to video"
373 |             }
374 | 
375 | 
376 |     @mcp.tool(description="Use this tool for adding fade in effect to video, provide fade_duration in seconds, and output name like fadein_video.mp4, if there are multiple steps to be done after adding fade in then make sure to return object and return path should be false else return path should be true")
377 |     def fadein_video(video_path: str, fade_duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
378 |         try:
379 |             # Input validation
380 |             if fade_duration <= 0:
381 |                 return {
382 |                     "success": False,
383 |                     "error": "Fade duration must be positive",
384 |                     "message": "Invalid fade duration parameter"
385 |                 }
386 |             
387 |             output_path = get_output_path(output_name)
388 |             video = VideoStore.load(video_path)
389 |             faded_video = fadein(video, fade_duration)
390 |             
391 |             if return_path:
392 |                 faded_video.write_videofile(output_path)
393 |                 return {
394 |                     "success": True,
395 |                     "output_path": output_path,
396 |                     "message": "Fade in effect added successfully"
397 |                 }
398 |             else:
399 |                 ref = VideoStore.store(faded_video)
400 |                 return {
401 |                     "success": True,
402 |                     "output_object": ref
403 |                 }
404 |         except Exception as e:
405 |             logger.error(f"Error adding fade in effect to video {video_path}: {e}")
406 |             return {
407 |                 "success": False,
408 |                 "error": str(e),
409 |                 "error_type": type(e).__name__,
410 |                 "message": "Error adding fade in effect"
411 |             }
412 | 
413 |     @mcp.tool(description="Use this tool for adding fade out effect to video, provide fade_duration in seconds, and output name like fadeout_video.mp4, if there are multiple steps to be done after adding fade out then make sure to return object and return path should be false else return path should be true")
414 |     def fadeout_video(video_path: str, fade_duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
415 |         try:
416 |             # Input validation
417 |             if fade_duration <= 0:
418 |                 return {
419 |                     "success": False,
420 |                     "error": "Fade duration must be positive",
421 |                     "message": "Invalid fade duration parameter"
422 |                 }
423 |             
424 |             output_path = get_output_path(output_name)
425 |             video = VideoStore.load(video_path)
426 |             faded_video = fadeout(video, fade_duration)
427 |             
428 |             if return_path:
429 |                 faded_video.write_videofile(output_path)
430 |                 return {
431 |                     "success": True,
432 |                     "output_path": output_path,
433 |                     "message": "Fade out effect added successfully"
434 |                 }
435 |             else:
436 |                 ref = VideoStore.store(faded_video)
437 |                 return {
438 |                     "success": True,
439 |                     "output_object": ref
440 |                 }
441 |         except Exception as e:
442 |             logger.error(f"Error adding fade out effect to video {video_path}: {e}")
443 |             return {
444 |                 "success": False,
445 |                 "error": str(e),
446 |                 "error_type": type(e).__name__,
447 |                 "message": "Error adding fade out effect"
448 |             }
449 | 
450 |     @mcp.tool(description="Use this tool for adding text overlay to video, provide text, position coordinates (x,y), font_size, color, and output name, if there are multiple steps to be done after adding text overlay then make sure to return object and return path should be false else return path should be true")
451 |     def add_text_overlay(video_path: str, text: str, x: int, y: int, font_size: int, color: str, duration: float, output_name: str, return_path: bool, path_of_imagemagick: str) -> Dict[str, Any]:
452 |         try:
453 |             # Input validation
454 |             if not text or not text.strip():
455 |                 return {
456 |                     "success": False,
457 |                     "error": "Text cannot be empty",
458 |                     "message": "Invalid text parameter"
459 |                 }
460 |             if font_size <= 0:
461 |                 return {
462 |                     "success": False,
463 |                     "error": "Font size must be positive",
464 |                     "message": "Invalid font size parameter"
465 |                 }
466 |             if duration <= 0:
467 |                 return {
468 |                     "success": False,
469 |                     "error": "Duration must be positive",
470 |                     "message": "Invalid duration parameter"
471 |                 }
472 |             
473 |             output_path = get_output_path(output_name)
474 |             video = VideoStore.load(video_path)
475 |             
476 |             # Configure ImageMagick
477 |             mpy_conf.change_settings({"IMAGEMAGICK_BINARY": path_of_imagemagick})
478 |             
479 |             # Create a TextClip with specified parameters
480 |             text_clip = TextClip(text, fontsize=font_size, color=color)
481 |             
482 |             # Set position using the provided x, y coordinates and duration
483 |             text_clip = text_clip.set_position((x, y)).set_duration(duration)
484 |             
485 |             # Overlay text on video
486 |             final_video = CompositeVideoClip([video, text_clip])
487 |             
488 |             if return_path:
489 |                 final_video.write_videofile(output_path, fps=final_video.fps)
490 |                 return {
491 |                     "success": True,
492 |                     "output_path": output_path,
493 |                     "message": "Text overlay added successfully"
494 |                 }
495 |             else:
496 |                 ref = VideoStore.store(final_video)
497 |                 return {
498 |                     "success": True,
499 |                     "output_object": ref,
500 |                     "message": "Text overlay added successfully"
501 |                 }
502 | 
503 |         except Exception as e:
504 |             logger.error(f"Error adding text overlay to video {video_path}: {e}")
505 |             return {
506 |                 "success": False,
507 |                 "error": str(e),
508 |                 "error_type": type(e).__name__,
509 |                 "message": "Error adding text overlay. Make sure ImageMagick is installed and path is correct."
510 |             }
511 | 
512 |     @mcp.tool(description="Use this tool for adding image watermark/overlay to video, provide image_path, position coordinates (x,y), and output name, if there are multiple steps to be done after adding image overlay then make sure to return object and return path should be false else return path should be true")
513 |     def add_image_overlay(video_path: str, image_path: str, x: int, y: int, duration: float, output_name: str, return_path: bool) -> Dict[str, Any]:
514 |         try:
515 |             # Input validation
516 |             if duration <= 0:
517 |                 return {
518 |                     "success": False,
519 |                     "error": "Duration must be positive",
520 |                     "message": "Invalid duration parameter"
521 |                 }
522 |             
523 |             output_path = get_output_path(output_name)
524 |             video = VideoStore.load(video_path)
525 |             logo = ImageClip(image_path).set_duration(duration).set_position((x, y))
526 |             final_video = CompositeVideoClip([video, logo])
527 |             
528 |             if return_path:
529 |                 final_video.write_videofile(output_path)
530 |                 return {
531 |                     "success": True,
532 |                     "output_path": output_path,
533 |                     "message": "Image overlay added successfully"
534 |                 }
535 |             else:
536 |                 ref = VideoStore.store(final_video)
537 |                 return {
538 |                     "success": True,
539 |                     "output_object": ref
540 |                 }
541 |         except Exception as e:
542 |             logger.error(f"Error adding image overlay to video {video_path}: {e}")
543 |             return {
544 |                 "success": False,
545 |                 "error": str(e),
546 |                 "error_type": type(e).__name__,
547 |                 "message": "Error adding image overlay"
548 |             }
549 | 
550 |     @mcp.tool(description="Use this tool for converting video to grayscale/black and white, provide output name like grayscale_video.mp4, if there are multiple steps to be done after converting to grayscale then make sure to return object and return path should be false else return path should be true")
551 |     def grayscale_video(video_path: str, output_name: str, return_path: bool) -> Dict[str, Any]:
552 |         try:
553 |             output_path = get_output_path(output_name)
554 |             video = VideoStore.load(video_path)
555 |             gray_video = video.fx(blackwhite)
556 |             
557 |             if return_path:
558 |                 gray_video.write_videofile(output_path)
559 |                 return {
560 |                     "success": True,
561 |                     "output_path": output_path,
562 |                     "message": "Video converted to grayscale successfully"
563 |                 }
564 |             else:
565 |                 ref = VideoStore.store(gray_video)
566 |                 return {
567 |                     "success": True,
568 |                     "output_object": ref
569 |                 }
570 |         except Exception as e:
571 |             logger.error(f"Error converting video to grayscale {video_path}: {e}")
572 |             return {
573 |                 "success": False,
574 |                 "error": str(e),
575 |                 "error_type": type(e).__name__,
576 |                 "message": "Error converting video to grayscale"
577 |             }
578 | 
579 |     @mcp.tool(description="Use this tool for creating video from image sequence, provide folder path with images, fps, and output name, if there are multiple steps to be done after creating video from images then make sure to return object and return path should be false else return path should be true")
580 |     def images_to_video(images_folder_path:str, fps:int, output_name:str, return_path:bool) -> Dict[str,Any]:
581 |         try:
582 |             output_path = get_output_path(output_name)
583 |             clip = ImageSequenceClip(images_folder_path, fps=fps)
584 |             if return_path:
585 |                 clip.write_videofile(output_path)
586 |                 return {
587 |                     "success": True,
588 |                     "output_path": output_path,
589 |                     "message": "Video created from images successfully"
590 |                 }
591 |             else:
592 |                 ref = VideoStore.store(clip)
593 |                 return {
594 |                     "success": True,
595 |                     "output_object": ref
596 |                 }
597 |         except Exception as e:
598 |             logger.error(f"Error creating video from images {images_folder_path}: {e}")
599 |             return {
600 |                 "success": False,
601 |                 "error": str(e),
602 |                 "error_type": type(e).__name__,
603 |                 "message": "Error creating video from images"
604 |             }
605 | 
606 |     @mcp.tool(description="Use this tool for extracting frames from video as images, provide start_time, end_time, and fps for extraction, if there are multiple steps to be done after extracting frames then make sure to return object and return path should be false else return path should be true")
607 |     def extract_frames(video_path:str, start_time:float, end_time:float, fps:int, output_folder_name:str, return_path:bool) -> Dict[str,Any]:
608 |         try:
609 |             video = VideoStore.load(video_path)
610 |             subclip = video.subclip(start_time, end_time)
611 |             if return_path:
612 |                 os.makedirs(output_folder_name, exist_ok=True)
613 |                 for i, frame in enumerate(subclip.iter_frames(fps=fps)):
614 |                     frame_path = os.path.join(output_folder_name, f"frame_{i:04d}.png")
615 |                     imageio.imwrite(frame_path, frame)
616 |                 return {
617 |                     "success": True,
618 |                     "output_path": output_folder_name,
619 |                     "message": "Frames extracted successfully"
620 |                 }
621 |             else:
622 |                 frames = list(subclip.iter_frames(fps=fps))
623 |                 return {
624 |                     "success": True,
625 |                     "output_object": frames,
626 |                     "message": "Frames extracted to memory"
627 |                 }
628 |         except Exception as e:
629 |             logger.error(f"Error extracting frames from video {video_path}: {e}")
630 |             return {
631 |                 "success": False,
632 |                 "error": str(e),
633 |                 "error_type": type(e).__name__,
634 |                 "message": "Error extracting frames from video"
635 |             }
636 | 
637 |     @mcp.tool(description="Use this tool for mirroring video horizontally, provide output name like mirrored_video.mp4, if there are multiple steps to be done after mirroring then make sure to return object and return path should be false else return path should be true")
638 |     def mirror_video(video_path:str, output_name:str, return_path:bool) -> Dict[str,Any]:
639 |         try:
640 |             output_path = get_output_path(output_name)
641 |             video = VideoStore.load(video_path)
642 |             mirrored_video = video.fx(mirror_x)
643 |             if return_path:
644 |                 mirrored_video.write_videofile(output_path)
645 |                 return {
646 |                     "success": True,
647 |                     "output_path": output_path,
648 |                     "message": "Video mirrored successfully"
649 |                 }
650 |             else:
651 |                 ref = VideoStore.store(mirrored_video)
652 |                 return {
653 |                     "success": True,
654 |                     "output_object": ref
655 |                 }
656 |         except Exception as e:
657 |             logger.error(f"Error mirroring video {video_path}: {e}")
658 |             return {
659 |                 "success": False,
660 |                 "error": str(e),
661 |                 "error_type": type(e).__name__,
662 |                 "message": "Error mirroring video"
663 |             }
664 | 
665 |     @mcp.tool(description="Use this tool for splitting video into multiple parts at specific timestamps, provide list of split times in seconds, if there are multiple steps to be done after splitting then make sure to return object and return path should be false else return path should be true")
666 |     def split_video_at_times(video_path:str, split_times:List[float], output_name:str, return_path:bool) -> Dict[str,Any]:
667 |         try:
668 |             output_path = get_output_path(output_name)
669 |             video = VideoStore.load(video_path)
670 |             segments = []
671 |             split_times = [0] + split_times + [video.duration]
672 |             
673 |             for i in range(len(split_times) - 1):
674 |                 start = split_times[i]
675 |                 end = split_times[i + 1]
676 |                 segment = video.subclip(start, end)
677 |                 segments.append(segment)
678 |                 
679 |             if return_path:
680 |                 output_paths = []
681 |                 for i, segment in enumerate(segments):
682 |                     segment_path = os.path.join(output_path, f"{output_name}_part_{i+1}.mp4")
683 |                     segment.write_videofile(segment_path)
684 |                     output_paths.append(segment_path)
685 |                 return {
686 |                     "success": True,
687 |                     "output_paths": output_paths,
688 |                     "message": "Video split successfully"
689 |                 }
690 |             else:
691 |                 refs = [VideoStore.store(segment) for segment in segments]
692 |                 return {
693 |                     "success": True,
694 |                     "output_objects": refs
695 |                 }
696 |         except Exception as e:
697 |             logger.error(f"Error splitting video at times {video_path}: {e}")
698 |             return {
699 |                 "success": False,
700 |                 "error": str(e),
701 |                 "error_type": type(e).__name__,
702 |                 "message": "Error splitting video at times"
703 |             }
704 | 
705 |     @mcp.tool(description="Use this tool for converting video format with codec and quality control, provide codec, fps, bitrate, if there are multiple steps to be done after converting video format then make sure to return object and return path should be false else return path should be true")
706 |     def convert_video_format(video_path:str, output_name:str, codec:str, fps:Optional[int], bitrate:Optional[str], return_path:bool) -> Dict[str,Any]:
707 |         try:
708 |             output_path = get_output_path(output_name)
709 |             video = VideoStore.load(video_path)
710 |             write_kwargs = {"codec": codec}
711 |             if fps:
712 |                 write_kwargs["fps"] = fps
713 |             if bitrate:
714 |                 write_kwargs["bitrate"] = bitrate
715 |                 
716 |             if return_path:
717 |                 video.write_videofile(output_path, **write_kwargs)
718 |                 return {
719 |                     "success": True,
720 |                     "output_path": output_path,
721 |                     "message": "Video format converted successfully"
722 |                 }
723 |             else:
724 |                 ref = VideoStore.store(video)
725 |                 return {
726 |                     "success": True,
727 |                     "output_object": ref
728 |                 }
729 |         except Exception as e:
730 |             logger.error(f"Error converting video format {video_path}: {e}")
731 |             return {
732 |                 "success": False,
733 |                 "error": str(e),
734 |                 "error_type": type(e).__name__,
735 |                 "message": "Error converting video format"
736 |             }
737 | 
738 |     @mcp.tool(description="Use this tool for adding video overlay with transparency, provide overlay video, position, and opacity (0-1), if there are multiple steps to be done after adding video overlay then make sure to return object and return path should be false else return path should be true")
739 |     def add_video_overlay(base_video_path:str, overlay_video_path:str, x:int, y:int, opacity:float, output_name:str, return_path:bool,duration:float) -> Dict[str,Any]:
740 |         try:
741 |             output_path = get_output_path(output_name)
742 |             base_video = VideoStore.load(base_video_path)
743 |             overlay_video = VideoStore.load(overlay_video_path)
744 |             
745 |             overlay_positioned = overlay_video.set_position((x, y)).set_opacity(opacity).set_duration(duration)
746 |             final_video = CompositeVideoClip([base_video, overlay_positioned])
747 |             
748 |             if return_path:
749 |                 final_video.write_videofile(output_path)
750 |                 return {
751 |                     "success": True,
752 |                     "output_path": output_path,
753 |                     "message": "Video overlay added successfully"
754 |                 }
755 |             else:
756 |                 ref = VideoStore.store(final_video)
757 |                 return {
758 |                     "success": True,
759 |                     "output_object": ref
760 |                 }
761 |         except Exception as e:
762 |             logger.error(f"Error adding video overlay {base_video_path}: {e}")
763 |             return {
764 |                 "success": False,
765 |                 "error": str(e),
766 |                 "error_type": type(e).__name__,
767 |                 "message": "Error adding video overlay"
768 |             } 
769 |         
770 | 
```