# Directory Structure ``` ├── .gitignore ├── .python-version ├── LICENSE ├── MANIFEST.in ├── pyproject.toml ├── README.md ├── requirements.txt ├── src │ └── video_edit_mcp │ ├── __init__.py │ ├── audio_operations.py │ ├── download_utils.py │ ├── main.py │ ├── util_tools.py │ ├── utils.py │ └── video_operations.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.12 2 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python 2 | __pycache__/ 3 | *.py[cod] 4 | *$py.class 5 | *.so 6 | 7 | # Distribution / packaging 8 | build/ 9 | dist/ 10 | *.egg-info/ 11 | *.egg 12 | 13 | # Virtual environments 14 | .env 15 | .venv 16 | env/ 17 | venv/ 18 | ENV/ 19 | 20 | # Testing 21 | .pytest_cache/ 22 | .coverage 23 | htmlcov/ 24 | 25 | # Media files (project-specific) 26 | *.mp4 27 | *.avi 28 | *.mov 29 | *.mkv 30 | *.flv 31 | *.wmv 32 | *.webm 33 | *.mp3 34 | *.wav 35 | *.aac 36 | *.ogg 37 | *.flac 38 | *.m4a 39 | *.jpg 40 | *.jpeg 41 | *.png 42 | *.gif 43 | *.bmp 44 | *.tiff 45 | *.svg 46 | 47 | # Video processing outputs 48 | video_store/ 49 | output/ 50 | temp/ 51 | downloads/ 52 | 53 | # IDE 54 | .vscode/ 55 | .idea/ 56 | *.swp 57 | *.swo 58 | 59 | # OS 60 | .DS_Store 61 | Thumbs.db 62 | Desktop.ini 63 | 64 | # Temporary files 65 | *.tmp 66 | *.temp 67 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Video Edit MCP Server 🎬 2 | 3 | A powerful **Model Context Protocol (MCP)** server designed for advanced video and audio editing operations. This server enables MCP clients—such as Claude Desktop, Cursor, and others—to perform comprehensive multimedia editing tasks through a standardized and unified interface. 4 | 5 |  6 |  7 |  8 | 9 | 10 | https://github.com/user-attachments/assets/134b8b82-80b1-4678-8930-ab53121b121f 11 | 12 | 13 | 14 | 15 | 16 | ## ✨ Key Features 17 | 18 | ### 🎥 Video Operations 19 | - **Basic Editing**: Trim, merge, resize, crop, rotate videos 20 | - **Effects**: Speed control, fade in/out, grayscale, mirror 21 | - **Overlays**: Add text, images, or video overlays with transparency 22 | - **Format Conversion**: Convert between formats with codec control 23 | - **Frame Operations**: Extract frames, create videos from images 24 | 25 | ### 🎵 Audio Operations 26 | - **Audio Processing**: Extract, trim, loop, concatenate audio 27 | - **Volume Control**: Adjust levels, fade in/out effects 28 | - **Audio Mixing**: Mix multiple tracks together 29 | - **Integration**: Add audio to videos, replace soundtracks 30 | 31 | ### 📥 Download & Utilities 32 | - **Video Download**: Download from YouTube and other platforms 33 | - **File Management**: Directory operations, file listing 34 | - **Path Suggestions**: Get recommended download locations 35 | 36 | ### 🧹 Memory & Cleanup 37 | - **Smart Memory**: Chain operations without saving intermediate files 38 | - **Resource Management**: Clear memory, check stored objects 39 | - **Efficient Processing**: Keep objects in memory for complex workflows 40 | 41 | ### 🔗 Operation Chaining 42 | Seamlessly chain multiple operations together without creating intermediate files. Process your video through multiple steps (trim → add audio → apply effects → add text) while keeping everything in memory for optimal performance. 43 | 44 | ## 📋 Requirements 45 | 46 | - **Python 3.10 or higher** 47 | - **moviepy==1.0.3** 48 | - **yt-dlp>=2023.1.6** 49 | - **mcp>=1.12.2** 50 | - **typing-extensions>=4.0.0** 51 | 52 | ## ⚙️ Installation & Setup 53 | 54 | 55 | 56 | ### For Claude Desktop / Cursor MCP Integration 57 | 58 | **Ensure that `uv` is installed.** 59 | If not, install it using the following PowerShell command: 60 | 61 | ```powershell 62 | powershell -ExecutionPolicy Bypass -Command "irm https://astral.sh/uv/install.ps1 | iex" 63 | ``` 64 | 65 | Add this configuration to your MCP configuration file: 66 | 67 | ```json 68 | { 69 | "mcpServers": { 70 | "video_editing": { 71 | "command": "uvx", 72 | "args": [ 73 | "--python", 74 | "3.11", 75 | "video-edit-mcp" 76 | ] 77 | } 78 | } 79 | } 80 | ``` 81 | 82 | **Configuration file locations:** 83 | - **Claude Desktop (Windows)**: `%APPDATA%/Claude/claude_desktop_config.json` 84 | - **Claude Desktop (macOS)**: `~/Library/Application Support/Claude/claude_desktop_config.json` 85 | - **Cursor**: `.cursor/mcp.json` in your project root 86 | 87 | ### Manual Installation 88 | 89 | ```bash 90 | git clone https://github.com/Aditya2755/video-edit-mcp.git 91 | cd video-edit-mcp 92 | pip install -r requirements.txt 93 | pip install -e . 94 | ``` 95 | 96 | ## 🏗️ Project Structure 97 | 98 | ``` 99 | video_edit_mcp/ 100 | ├── src/ 101 | │ └── video_edit_mcp/ 102 | │ ├── __init__.py 103 | │ ├── main.py # MCP server implementation 104 | │ ├── video_operations.py # Video editing tools 105 | │ ├── audio_operations.py # Audio processing tools 106 | │ ├── download_utils.py # Download functionality 107 | │ ├── util_tools.py # Memory & utility tools 108 | │ ├── utils.py # Utility functions 109 | │ 110 | ├── pyproject.toml # Project configuration 111 | ├── requirements.txt # Dependencies 112 | ├── uv.lock # Lock file 113 | ├── LICENSE # MIT License 114 | ├── MANIFEST.in # Manifest file 115 | └── README.md 116 | ``` 117 | 118 | ## 🎯 Example Usage 119 | 120 | ```python 121 | # Chain operations without intermediate files 122 | video_info = get_video_info("input.mp4") 123 | trimmed = trim_video("input.mp4", 10, 60, return_path=False) # Keep in memory 124 | with_audio = add_audio(trimmed, "background.mp3", return_path=False) 125 | final = add_text_overlay(with_audio, "Hello World", x=100, y=50, return_path=True) 126 | ``` 127 | 128 | ## 🚀 Future Enhancements & Contributions 129 | 130 | We welcome contributions in these exciting areas: 131 | 132 | ### 🤖 AI-Powered Features 133 | - **Speech-to-Text (STT)**: Automatic subtitle generation and transcription 134 | - **Text-to-Speech (TTS)**: AI voice synthesis for narration 135 | - **Audio Enhancement**: AI-based noise reduction and audio quality improvement 136 | - **Smart Timestamps**: Automatic scene detection and chapter generation 137 | - **Face Tracking**: Advanced face detection and tracking for automatic editing 138 | - **Object Recognition**: Track and edit based on detected objects 139 | - **Content Analysis**: AI-powered content categorization and tagging 140 | 141 | ## 🤝 Contributing 142 | 143 | 1. Fork the repository 144 | 2. Create a feature branch: `git checkout -b feature/amazing-feature` 145 | 3. Commit your changes: `git commit -m 'Add amazing feature'` 146 | 4. Push to the branch: `git push origin feature/amazing-feature` 147 | 5. Open a Pull Request 148 | 149 | ## 📄 License 150 | 151 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. 152 | 153 | --- 154 | 155 | <div align="center"> 156 | 157 | **Made with ❤️ for the AI and multimedia editing community** 158 | 159 | [⭐ Star this project](https://github.com/Aditya2755/video-edit-mcp) | [🤝 Contribute](https://github.com/Aditya2755/video-edit-mcp/contribute) | [📖 Documentation](https://github.com/Aditya2755/video-edit-mcp#readme) 160 | 161 | </div> ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | moviepy==1.0.3 2 | yt-dlp>=2023.1.6 3 | mcp>=1.12.2 4 | Pillow==9.5.0 5 | ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """Video Edit MCP Server - A powerful MCP server for video editing operations using MoviePy.""" 2 | 3 | __version__ = "0.1.1" 4 | 5 | 6 | from .main import mcp 7 | 8 | __all__ = ["mcp"] ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/main.py: -------------------------------------------------------------------------------- ```python 1 | from mcp.server.fastmcp import FastMCP 2 | import logging 3 | from .video_operations import register_video_tools 4 | from .audio_operations import register_audio_tools 5 | from .download_utils import register_download_and_utility_tools 6 | from .util_tools import register_util_tools 7 | # Configure logging 8 | logging.basicConfig(level=logging.INFO) 9 | logger = logging.getLogger(__name__) 10 | 11 | mcp = FastMCP() 12 | 13 | # Register all tools from different modules 14 | register_video_tools(mcp) 15 | register_audio_tools(mcp) 16 | register_download_and_utility_tools(mcp) 17 | register_util_tools(mcp) 18 | 19 | def main(): 20 | """Entry point for the MCP server""" 21 | mcp.run(transport="streamable-http") 22 | 23 | if __name__ == "__main__": 24 | main() 25 | 26 | 27 | 28 | ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/utils.py: -------------------------------------------------------------------------------- ```python 1 | # Simple cross-platform output directory helper 2 | import os 3 | from pathlib import Path 4 | import uuid 5 | from moviepy.editor import VideoFileClip, AudioFileClip 6 | from PIL import Image, ImageDraw, ImageFont 7 | import tempfile 8 | import logging 9 | 10 | logger = logging.getLogger(__name__) 11 | 12 | def get_output_path(filename: str) -> str: 13 | """Get cross-platform output path for files""" 14 | # Use environment variable if set, otherwise default to Downloads 15 | output_dir = os.environ.get("VIDEO_MCP_OUTPUT_DIR", str(Path.home() / "Downloads" / "video_mcp_output")) 16 | Path(output_dir).mkdir(parents=True, exist_ok=True) 17 | return os.path.join(output_dir, filename) 18 | 19 | class VideoStore: 20 | _store = {} 21 | 22 | @classmethod 23 | def store(cls, video_clip) -> str: 24 | ref = str(uuid.uuid4()) 25 | cls._store[ref] = video_clip 26 | return ref 27 | 28 | @classmethod 29 | def load(cls, video_ref: str): 30 | if video_ref in cls._store: 31 | return cls._store[video_ref] 32 | return VideoFileClip(video_ref) 33 | 34 | @classmethod 35 | def clear(cls): 36 | cls._store.clear() 37 | 38 | class AudioStore: 39 | _store = {} 40 | 41 | @classmethod 42 | def store(cls, audio_clip) -> str: 43 | ref = str(uuid.uuid4()) 44 | cls._store[ref] = audio_clip 45 | return ref 46 | 47 | @classmethod 48 | def load(cls, audio_ref: str): 49 | if audio_ref in cls._store: 50 | return cls._store[audio_ref] 51 | return AudioFileClip(audio_ref) 52 | 53 | @classmethod 54 | def clear(cls): 55 | cls._store.clear() 56 | 57 | 58 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [build-system] 2 | requires = ["hatchling"] 3 | build-backend = "hatchling.build" 4 | 5 | [project] 6 | name = "video-edit-mcp" 7 | version = "0.1.1" 8 | description = "A powerful Model Context Protocol server for video editing operations using MoviePy" 9 | readme = "README.md" 10 | license = {text = "MIT"} 11 | requires-python = ">=3.10" 12 | authors = [ 13 | {name = "Aditya_Barasiya", email = "[email protected]"}, 14 | ] 15 | keywords = ["mcp", "video", "editing", "moviepy", "ai", "server", "claude", "video-processing"] 16 | classifiers = [ 17 | "Development Status :: 4 - Beta", 18 | "Intended Audience :: Developers", 19 | "License :: OSI Approved :: MIT License", 20 | "Programming Language :: Python :: 3", 21 | "Programming Language :: Python :: 3.10", 22 | "Programming Language :: Python :: 3.11", 23 | "Programming Language :: Python :: 3.12", 24 | "Topic :: Software Development :: Libraries :: Python Modules", 25 | "Topic :: Multimedia :: Video", 26 | "Topic :: Scientific/Engineering :: Artificial Intelligence", 27 | "Topic :: Software Development :: Libraries :: Application Frameworks", 28 | ] 29 | 30 | dependencies = [ 31 | "mcp>=1.12.2", 32 | "moviepy==1.0.3", 33 | "yt-dlp>=2023.1.6", 34 | "typing-extensions>=4.0.0", 35 | "Pillow==9.5.0" 36 | ] 37 | 38 | [project.optional-dependencies] 39 | dev = [ 40 | "pytest>=7.0", 41 | "black>=22.0", 42 | "flake8>=4.0", 43 | "mypy>=1.0", 44 | ] 45 | 46 | [project.urls] 47 | Homepage = "https://github.com/Aditya2755/video-edit-mcp" 48 | Documentation = "https://github.com/Aditya2755/video-edit-mcp#readme" 49 | Repository = "https://github.com/Aditya2755/video-edit-mcp.git" 50 | "Bug Tracker" = "https://github.com/Aditya2755/video-edit-mcp/issues" 51 | 52 | [project.scripts] 53 | video-edit-mcp = "video_edit_mcp.main:main" 54 | 55 | [tool.hatch.build.targets.wheel] 56 | packages = ["src/video_edit_mcp"] 57 | 58 | [tool.hatch.build.targets.sdist] 59 | include = [ 60 | "/src", 61 | "/README.md", 62 | "/LICENSE", 63 | "/requirements.txt", 64 | ] 65 | 66 | [tool.black] 67 | line-length = 88 68 | target-version = ['py310'] # Corrected: Changed from 'py38' to 'py310' 69 | 70 | [tool.mypy] 71 | python_version = "3.10" # Corrected: Changed from "3.8" to "3.10" 72 | warn_return_any = true 73 | warn_unused_configs = true 74 | disallow_untyped_defs = false 75 | ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/util_tools.py: -------------------------------------------------------------------------------- ```python 1 | from typing import Dict, Any 2 | from moviepy.editor import VideoFileClip, AudioFileClip 3 | import os 4 | import logging 5 | from .utils import VideoStore, AudioStore 6 | 7 | logger = logging.getLogger(__name__) 8 | 9 | 10 | def register_util_tools(mcp): 11 | @mcp.tool(description="Use this tool to check what is stored in memory, like objects and etc.") 12 | def check_memory(store_type: str = "both") -> Dict[str, Any]: 13 | """ 14 | Check what objects are stored in memory. 15 | 16 | Args: 17 | store_type: Type of store to check ("video", "audio", or "both") 18 | """ 19 | try: 20 | if store_type.lower() == "video": 21 | return { 22 | "success": True, 23 | "video_memory": VideoStore._store, 24 | "video_count": len(VideoStore._store) 25 | } 26 | elif store_type.lower() == "audio": 27 | return { 28 | "success": True, 29 | "audio_memory": AudioStore._store, 30 | "audio_count": len(AudioStore._store) 31 | } 32 | else: # both or any other value 33 | return { 34 | "success": True, 35 | "video_memory": VideoStore._store, 36 | "audio_memory": AudioStore._store, 37 | "video_count": len(VideoStore._store), 38 | "audio_count": len(AudioStore._store), 39 | "total_objects": len(VideoStore._store) + len(AudioStore._store) 40 | } 41 | except Exception as e: 42 | logger.error(f"Error checking memory: {e}") 43 | return { 44 | "success": False, 45 | "error": str(e), 46 | "error_type": type(e).__name__, 47 | "message": "Error checking memory" 48 | } 49 | 50 | @mcp.tool(description="Use this tool for clearing all stored video and audio objects from memory to free up space") 51 | def clear_memory(clear_videos:bool, clear_audios:bool) -> Dict[str,Any]: 52 | try: 53 | if clear_videos: 54 | VideoStore.clear() 55 | if clear_audios: 56 | AudioStore.clear() 57 | return { 58 | "success": True, 59 | "message": f"Memory cleared - Videos: {clear_videos}, Audios: {clear_audios}" 60 | } 61 | except Exception as e: 62 | logger.error(f"Error clearing memory: {e}") 63 | return { 64 | "success": False, 65 | "error": str(e), 66 | "error_type": type(e).__name__, 67 | "message": "Error clearing memory" 68 | } 69 | 70 | @mcp.tool(description="Use this tool for listing files in a directory, provide directory path") 71 | def list_files(directory_path: str) -> Dict[str, Any]: 72 | try: 73 | if not os.path.exists(directory_path): 74 | return { 75 | "success": False, 76 | "error": "Directory does not exist", 77 | "message": "Provide valid directory path" 78 | } 79 | files = os.listdir(directory_path) 80 | return { 81 | "success": True, 82 | "files": files 83 | } 84 | except Exception as e: 85 | return { 86 | "success": False, 87 | "error": str(e), 88 | "message": "Error listing files" 89 | } 90 | 91 | @mcp.tool(description="Use this tool to create a directory to store output files, make sure to provide accurate path") 92 | def make_directory(directory_path: str) -> Dict[str, Any]: 93 | try: 94 | os.makedirs(directory_path, exist_ok=True) 95 | return { 96 | "success": True, 97 | "message": "Directory created successfully" 98 | } 99 | except Exception as e: 100 | return { 101 | "success": False, 102 | "error": str(e), 103 | "message": "Error creating directory" 104 | } ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/download_utils.py: -------------------------------------------------------------------------------- ```python 1 | import yt_dlp 2 | import os 3 | import logging 4 | from typing import Dict, Any, Optional, List 5 | from .utils import VideoStore, AudioStore 6 | 7 | logger = logging.getLogger(__name__) 8 | 9 | def get_default_download_paths(): 10 | """Get common download directory paths""" 11 | user_home = os.path.expanduser("~") 12 | return { 13 | "user_downloads": os.path.join(user_home, "Downloads"), 14 | "desktop": os.path.join(user_home, "Desktop"), 15 | "documents": os.path.join(user_home, "Documents"), 16 | "project_root": os.path.abspath(".") if "video_edit_mcp" in os.path.abspath(".") else None 17 | } 18 | 19 | def register_download_and_utility_tools(mcp): 20 | """Register all download and utility tools with the MCP server""" 21 | @mcp.tool(description= "use this tool to download videos make sure to give proper path for saving video not just name, if there are multiple steps to be done after downloading then make sure to return object and return path should be false else return path should be true") 22 | def download_video( 23 | url: str, 24 | save_path: Optional[str] = None, 25 | audio_only: bool = False, 26 | **yt_dlp_options: Any 27 | ) -> Dict[str, Any]: 28 | """ 29 | Download video or audio from URL using yt-dlp 30 | 31 | Args: 32 | url: URL to download from 33 | save_path: Directory path or full file path template where to save 34 | audio_only: If True, download and extract audio only 35 | **yt_dlp_options: Additional yt-dlp options 36 | 37 | Returns: 38 | Dict with success status, file path, and other info 39 | """ 40 | try: 41 | # Start with user-provided yt-dlp options 42 | ydl_opts = yt_dlp_options.copy() 43 | 44 | # --- Handle Core Parameters --- 45 | 46 | # 1. Set output path template if not already specified by user 47 | if 'outtmpl' not in ydl_opts: 48 | if save_path: 49 | # Check if user provided a relative path that might cause confusion 50 | if not os.path.isabs(save_path) and not save_path.startswith('.'): 51 | logger.warning(f"Relative path detected: '{save_path}'. This will save relative to current directory: {os.getcwd()}") 52 | 53 | # Convert save_path to absolute path first 54 | save_path = os.path.abspath(save_path) 55 | 56 | if os.path.isdir(save_path): 57 | # save_path is a directory 58 | ydl_opts['outtmpl'] = os.path.join(save_path, '%(title)s [%(id)s].%(ext)s') 59 | else: 60 | # save_path could be a full path or template 61 | # Check if it contains yt-dlp template variables 62 | if '%(' in save_path: 63 | ydl_opts['outtmpl'] = save_path 64 | else: 65 | # Treat as full file path, ensure directory exists 66 | directory = os.path.dirname(save_path) 67 | if directory and not os.path.exists(directory): 68 | os.makedirs(directory, exist_ok=True) 69 | ydl_opts['outtmpl'] = save_path 70 | else: 71 | # Default to user's Downloads folder instead of current directory 72 | default_paths = get_default_download_paths() 73 | downloads_dir = default_paths["user_downloads"] 74 | 75 | # Create Downloads directory if it doesn't exist 76 | os.makedirs(downloads_dir, exist_ok=True) 77 | ydl_opts['outtmpl'] = os.path.join(downloads_dir, '%(title)s [%(id)s].%(ext)s') 78 | 79 | # 2. Configure for audio-only downloads 80 | if audio_only: 81 | # Set format to best audio 82 | ydl_opts['format'] = 'bestaudio/best' 83 | # Ensure post-processor is set to extract audio (defaults to mp3) 84 | if 'postprocessors' not in ydl_opts: 85 | ydl_opts['postprocessors'] = [] 86 | 87 | # Add audio extraction post-processor 88 | audio_postprocessor = { 89 | 'key': 'FFmpegExtractAudio', 90 | 'preferredcodec': ydl_opts.get('audio_format', 'mp3'), 91 | 'preferredquality': ydl_opts.get('audio_quality', '192'), 92 | } 93 | ydl_opts['postprocessors'].append(audio_postprocessor) 94 | 95 | # Add logger 96 | ydl_opts['logger'] = logger 97 | 98 | # --- Execute Download --- 99 | with yt_dlp.YoutubeDL(ydl_opts) as ydl: 100 | # Extract info and download 101 | info = ydl.extract_info(url, download=not ydl_opts.get('simulate', False)) 102 | 103 | # Prepare response 104 | response = { 105 | "success": True, 106 | "message": "Operation successful!", 107 | "title": info.get('title', 'Unknown'), 108 | "id": info.get('id', 'Unknown'), 109 | "duration": info.get('duration'), 110 | "uploader": info.get('uploader') 111 | } 112 | 113 | # If not simulating, get the actual file path 114 | if not ydl_opts.get('simulate', False): 115 | # Get the base filename that yt-dlp would use 116 | base_filepath = ydl.prepare_filename(info) 117 | 118 | # Convert to absolute path 119 | base_filepath = os.path.abspath(base_filepath) 120 | 121 | # For audio-only downloads, the file extension will change 122 | if audio_only: 123 | # Find the audio codec used 124 | audio_codec = ydl_opts.get('audio_format', 'mp3') 125 | # Change extension to match the extracted audio format 126 | base_name = os.path.splitext(base_filepath)[0] 127 | actual_filepath = f"{base_name}.{audio_codec}" 128 | else: 129 | actual_filepath = base_filepath 130 | 131 | # Ensure we have absolute path 132 | actual_filepath = os.path.abspath(actual_filepath) 133 | 134 | # Verify the file exists 135 | if os.path.exists(actual_filepath): 136 | response.update({ 137 | "filepath": actual_filepath, 138 | "filename": os.path.basename(actual_filepath), 139 | "directory": os.path.dirname(actual_filepath), 140 | "file_size": os.path.getsize(actual_filepath) 141 | }) 142 | else: 143 | # If exact path doesn't exist, try to find the actual file 144 | # This handles cases where yt-dlp might have modified the filename 145 | directory = os.path.dirname(actual_filepath) 146 | base_name = os.path.splitext(os.path.basename(actual_filepath))[0] 147 | 148 | # Look for files with similar names in the directory 149 | if os.path.exists(directory): 150 | found_file = None 151 | for file in os.listdir(directory): 152 | if base_name in file or info.get('id', '') in file: 153 | found_file = file 154 | break 155 | 156 | if found_file: 157 | found_filepath = os.path.abspath(os.path.join(directory, found_file)) 158 | response.update({ 159 | "filepath": found_filepath, 160 | "filename": found_file, 161 | "directory": os.path.dirname(found_filepath), 162 | "file_size": os.path.getsize(found_filepath), 163 | "note": "Filename was modified during download" 164 | }) 165 | else: 166 | response.update({ 167 | "filepath": actual_filepath, 168 | "filename": os.path.basename(actual_filepath), 169 | "directory": os.path.dirname(actual_filepath), 170 | "warning": "File path expected but not found at exact location", 171 | "expected_path": actual_filepath 172 | }) 173 | else: 174 | response.update({ 175 | "filepath": actual_filepath, 176 | "filename": os.path.basename(actual_filepath), 177 | "directory": os.path.dirname(actual_filepath), 178 | "warning": "Directory not found", 179 | "expected_path": actual_filepath 180 | }) 181 | else: 182 | # Simulation mode 183 | simulated_path = ydl.prepare_filename(info) 184 | response["simulated_filepath"] = os.path.abspath(simulated_path) 185 | response["message"] = "Simulation successful - no file downloaded" 186 | 187 | return response 188 | 189 | except Exception as e: 190 | logger.error(f"Error during video download from {url}: {e}", exc_info=True) 191 | return { 192 | "success": False, 193 | "error": str(e), 194 | "error_type": type(e).__name__, 195 | "url": url, 196 | "message": "Download failed" 197 | } 198 | 199 | @mcp.tool(description="Get suggested download directory paths for saving videos/audio files") 200 | def get_download_paths() -> Dict[str, Any]: 201 | """ 202 | Get suggested common download directory paths 203 | 204 | Returns: 205 | Dict with suggested download paths 206 | """ 207 | try: 208 | paths = get_default_download_paths() 209 | current_dir = os.getcwd() 210 | 211 | return { 212 | "success": True, 213 | "current_working_directory": current_dir, 214 | "suggested_paths": { 215 | "user_downloads": paths["user_downloads"], 216 | "desktop": paths["desktop"], 217 | "documents": paths["documents"], 218 | "project_root": paths["project_root"], 219 | "current_directory": current_dir 220 | }, 221 | "usage_examples": { 222 | "save_to_downloads": paths["user_downloads"], 223 | "save_to_custom_folder": os.path.join(paths["user_downloads"], "YouTube_Videos"), 224 | "save_to_desktop": paths["desktop"] 225 | }, 226 | "note": "Use full absolute paths to avoid saving in unexpected locations" 227 | } 228 | except Exception as e: 229 | logger.error(f"Error getting download paths: {e}") 230 | return { 231 | "success": False, 232 | "error": str(e), 233 | "current_directory": os.getcwd(), 234 | "message": "Error getting download paths" 235 | } 236 | 237 | ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/audio_operations.py: -------------------------------------------------------------------------------- ```python 1 | from moviepy.editor import * 2 | from moviepy.audio.fx.all import audio_loop 3 | from moviepy.editor import CompositeAudioClip 4 | from typing import Dict, Any, List 5 | import os 6 | import logging 7 | from .utils import get_output_path, AudioStore 8 | 9 | logger = logging.getLogger(__name__) 10 | 11 | def register_audio_tools(mcp): 12 | """Register all audio processing tools with the MCP server""" 13 | 14 | @mcp.tool(description="get audio info") 15 | def audio_info(audio_path:str) -> Dict[str,Any]: 16 | try: 17 | 18 | audio = AudioFileClip(audio_path) 19 | return{ 20 | "success": True, 21 | "audio_info": { 22 | "duration": audio.duration, 23 | "fps": audio.fps, 24 | "channels": audio.nchannels 25 | } 26 | } 27 | except Exception as e: 28 | logger.error(f"Error getting audio info for {audio_path}: {e}") 29 | return { 30 | "success": False, 31 | "error": str(e), 32 | "error_type": type(e).__name__ 33 | } 34 | 35 | @mcp.tool(description="Use this tool for extracting audio from the video , and make sure only give output name like extracted_audio.mp3 , some_hello.mp3 etc.. don't pass path just give meaningful names based on audio info, if there are multiple steps to be done after extracting audio then make sure to return object and return path should be false else return path should be true") 36 | def extract_audio(video_path: str, output_name: str, return_path: bool) -> Dict[str, Any]: 37 | try: 38 | output_path = get_output_path(output_name) 39 | from .utils import VideoStore 40 | video = VideoStore.load(video_path) 41 | 42 | if video.audio is None: 43 | return { 44 | "success": False, 45 | "error": "Video has no audio track", 46 | "message": "No audio to extract" 47 | } 48 | 49 | audio = video.audio 50 | if return_path: 51 | audio.write_audiofile(output_path) 52 | return { 53 | "success": True, 54 | "output_path": output_path, 55 | "message": "Audio extracted successfully" 56 | } 57 | else: 58 | ref = AudioStore.store(audio) 59 | return { 60 | "success": True, 61 | "output_object": ref 62 | } 63 | except Exception as e: 64 | logger.error(f"Error extracting audio from {video_path}: {e}") 65 | return { 66 | "success": False, 67 | "error": str(e), 68 | "error_type": type(e).__name__, 69 | "message": "Error extracting audio" 70 | } 71 | 72 | @mcp.tool(description="Trim audio file, make sure to provide proper start and end time, and make sure to provide output name like trimmed_audio.mp3, some_hello.mp3 etc. don't pass path just give meaningful names based on audio info, if there are multiple steps to be done after trimming then make sure to return object and return path should be false else return path should be true") 73 | def trim_audio(audio_path: str, start_time: float, end_time: float, output_name: str, return_path: bool) -> Dict[str, Any]: 74 | try: 75 | # Input validation 76 | if start_time < 0 or end_time < 0: 77 | return { 78 | "success": False, 79 | "error": "Start and end times must be positive", 80 | "message": "Invalid time parameters" 81 | } 82 | if start_time >= end_time: 83 | return { 84 | "success": False, 85 | "error": "Start time must be less than end time", 86 | "message": "Invalid time range" 87 | } 88 | 89 | output_path = get_output_path(output_name) 90 | audio = AudioStore.load(audio_path) 91 | trimmed_audio = audio.subclip(start_time, end_time) 92 | 93 | if return_path: 94 | trimmed_audio.write_audiofile(output_path) 95 | return { 96 | "success": True, 97 | "output_path": output_path, 98 | "message": "Audio trimmed successfully" 99 | } 100 | else: 101 | ref = AudioStore.store(trimmed_audio) 102 | return { 103 | "success": True, 104 | "output_object": ref, 105 | "message": "Audio trimmed successfully" 106 | } 107 | except Exception as e: 108 | logger.error(f"Error trimming audio {audio_path}: {e}") 109 | return { 110 | "success": False, 111 | "error": str(e), 112 | "error_type": type(e).__name__, 113 | "message": "Error trimming audio" 114 | } 115 | 116 | @mcp.tool(description="Use this tool to concatenate two audios, if there are multiple steps to be done after concatenating then make sure to return object and return path should be false else return path should be true") 117 | def concatenate_audio(audio_path_1: str, audio_path_2: str, output_name: str, return_path: bool) -> Dict[str, Any]: 118 | try: 119 | output_path = get_output_path(output_name) 120 | audio_1 = AudioStore.load(audio_path_1) 121 | audio_2 = AudioStore.load(audio_path_2) 122 | concatenated_audio = concatenate_audioclips([audio_1, audio_2]) 123 | 124 | if return_path: 125 | concatenated_audio.write_audiofile(output_path) 126 | return { 127 | "success": True, 128 | "output_path": output_path, 129 | "message": "Audio concatenated successfully" 130 | } 131 | else: 132 | ref = AudioStore.store(concatenated_audio) 133 | return { 134 | "success": True, 135 | "output_object": ref, 136 | "message": "Audio concatenated successfully" 137 | } 138 | except Exception as e: 139 | logger.error(f"Error concatenating audio files {audio_path_1} and {audio_path_2}: {e}") 140 | return { 141 | "success": False, 142 | "error": str(e), 143 | "error_type": type(e).__name__, 144 | "message": "Error concatenating audio files" 145 | } 146 | 147 | @mcp.tool(description="Use this tool to loop the audio, after looping make sure to provide output name like looped_audio.mp3, some_hello.mp3 etc. don't pass path just give meaningful names based on audio info and also make sure to provide duration in seconds, if there are multiple steps to be done after looping then make sure to return object and return path should be false else return path should be true") 148 | def loop_audio(audio_path: str, duration: float, output_name: str, return_path: bool) -> Dict[str, Any]: 149 | try: 150 | # Input validation 151 | if duration <= 0: 152 | return { 153 | "success": False, 154 | "error": "Duration must be positive", 155 | "message": "Invalid duration parameter" 156 | } 157 | 158 | output_path = get_output_path(output_name) 159 | audio = AudioStore.load(audio_path) 160 | looped_audio = audio_loop(audio, duration=duration) 161 | 162 | if return_path: 163 | looped_audio.write_audiofile(output_path) 164 | return { 165 | "success": True, 166 | "output_path": output_path, 167 | "message": "Audio looped successfully" 168 | } 169 | else: 170 | ref = AudioStore.store(looped_audio) 171 | return { 172 | "success": True, 173 | "output_object": ref, 174 | "message": "Audio looped successfully" 175 | } 176 | except Exception as e: 177 | logger.error(f"Error looping audio {audio_path}: {e}") 178 | return { 179 | "success": False, 180 | "error": str(e), 181 | "error_type": type(e).__name__, 182 | "message": "Error looping audio" 183 | } 184 | 185 | @mcp.tool(description="Adjust volume of an audio, if there are multiple steps to be done after adjusting volume then make sure to return object and return path should be false else return path should be true") 186 | def adjust_vol(audio_path: str, volume_level: float, output_name: str, return_path: bool) -> Dict[str, Any]: 187 | try: 188 | # Input validation 189 | if volume_level <= 0: 190 | return { 191 | "success": False, 192 | "error": "Volume level must be positive (e.g., 1.0 for normal, 2.0 for double)", 193 | "message": "Invalid volume level parameter" 194 | } 195 | 196 | output_path = get_output_path(output_name) 197 | audio = AudioStore.load(audio_path) 198 | audio_adjusted = audio.volumex(volume_level) 199 | 200 | if return_path: 201 | audio_adjusted.write_audiofile(output_path) 202 | return { 203 | "success": True, 204 | "output_path": output_path, 205 | "message": "Audio volume adjusted successfully" 206 | } 207 | else: 208 | ref = AudioStore.store(audio_adjusted) 209 | return { 210 | "success": True, 211 | "output_object": ref, 212 | "message": "Audio volume adjusted successfully" 213 | } 214 | except Exception as e: 215 | logger.error(f"Error adjusting audio volume {audio_path}: {e}") 216 | return { 217 | "success": False, 218 | "error": str(e), 219 | "error_type": type(e).__name__, 220 | "message": "Error adjusting audio volume" 221 | } 222 | 223 | @mcp.tool(description="Use this tool for audio fade in effect, provide fade_duration in seconds, and output name like fadein_audio.mp3, if there are multiple steps to be done after adding fade in then make sure to return object and return path should be false else return path should be true") 224 | def fadein_audio(audio_path:str, fade_duration:float, output_name:str, return_path:bool) -> Dict[str,Any]: 225 | try: 226 | output_path = get_output_path(output_name) 227 | audio = AudioStore.load(audio_path) 228 | fadein_audio = audio.audio_fadein(fade_duration) 229 | if return_path: 230 | fadein_audio.write_audiofile(output_path) 231 | return { 232 | "success": True, 233 | "output_path": output_path, 234 | "message": "Audio fade in effect added successfully" 235 | } 236 | else: 237 | ref = AudioStore.store(fadein_audio) 238 | return { 239 | "success": True, 240 | "output_object": ref 241 | } 242 | except Exception as e: 243 | logger.error(f"Error adding audio fade in effect {audio_path}: {e}") 244 | return { 245 | "success": False, 246 | "error": str(e), 247 | "error_type": type(e).__name__, 248 | "message": "Error adding audio fade in effect" 249 | } 250 | 251 | @mcp.tool(description="Use this tool for audio fade out effect, provide fade_duration in seconds, and output name like fadeout_audio.mp3, if there are multiple steps to be done after adding fade out then make sure to return object and return path should be false else return path should be true") 252 | def fadeout_audio(audio_path:str, fade_duration:float, output_name:str, return_path:bool) -> Dict[str,Any]: 253 | try: 254 | output_path = get_output_path(output_name) 255 | audio = AudioStore.load(audio_path) 256 | fadeout_audio = audio.audio_fadeout(fade_duration) 257 | if return_path: 258 | fadeout_audio.write_audiofile(output_path) 259 | return { 260 | "success": True, 261 | "output_path": output_path, 262 | "message": "Audio fade out effect added successfully" 263 | } 264 | else: 265 | ref = AudioStore.store(fadeout_audio) 266 | return { 267 | "success": True, 268 | "output_object": ref 269 | } 270 | except Exception as e: 271 | logger.error(f"Error adding audio fade out effect {audio_path}: {e}") 272 | return { 273 | "success": False, 274 | "error": str(e), 275 | "error_type": type(e).__name__, 276 | "message": "Error adding audio fade out effect" 277 | } 278 | 279 | @mcp.tool(description="Use this tool for mixing multiple audio tracks together, provide list of audio paths and output name, if there are multiple steps to be done after mixing audio tracks then make sure to return object and return path should be false else return path should be true") 280 | def mix_audio_tracks(audio_paths:List[str], output_name:str, return_path:bool) -> Dict[str,Any]: 281 | try: 282 | output_path = get_output_path(output_name) 283 | audio_clips = [AudioStore.load(clips) for clips in audio_paths] 284 | mixed_audio = CompositeAudioClip(audio_clips) 285 | mixed_audio.fps = 44100 286 | if return_path: 287 | try: 288 | mixed_audio.write_audiofile(output_path) 289 | return { 290 | "success": True, 291 | "output_path": output_path, 292 | "message": "Audio tracks mixed successfully" 293 | } 294 | except Exception as write_error: 295 | return { 296 | "success": False, 297 | "error": f"Failed to write file: {str(write_error)}", 298 | "message": "File writing failed" 299 | } 300 | else: 301 | return { 302 | "success": True, 303 | "output_object": mixed_audio 304 | } 305 | 306 | except Exception as e: 307 | return { 308 | "success": False, 309 | "error": f"Unexpected error: {str(e)}", 310 | "error_type": type(e).__name__, 311 | "message": "Unexpected error occurred" 312 | } ``` -------------------------------------------------------------------------------- /src/video_edit_mcp/video_operations.py: -------------------------------------------------------------------------------- ```python 1 | from moviepy.editor import * 2 | from moviepy.video.fx import * 3 | from moviepy.video.fx.speedx import speedx 4 | from moviepy.video.fx.rotate import rotate 5 | from moviepy.video.fx.crop import crop 6 | from moviepy.video.fx.resize import resize 7 | from moviepy.video.fx.fadein import fadein 8 | from moviepy.video.fx.fadeout import fadeout 9 | from moviepy.video.fx.blackwhite import blackwhite 10 | from moviepy.video.fx.mirror_x import mirror_x 11 | from moviepy.editor import ImageClip, CompositeVideoClip, ImageSequenceClip, TextClip 12 | from typing import Dict, Any, Optional, List, Tuple 13 | import os 14 | import logging 15 | import imageio 16 | from .utils import get_output_path, VideoStore, AudioStore 17 | import moviepy.config as mpy_conf 18 | 19 | 20 | logger = logging.getLogger(__name__) 21 | 22 | def register_video_tools(mcp): 23 | """Register all video processing tools with the MCP server""" 24 | 25 | @mcp.tool() 26 | def get_video_info(video_path: str) -> Dict[str, Any]: 27 | """Get comprehensive information about a video file including duration, fps, resolution, codec details, and audio information.""" 28 | try: 29 | # Load video file 30 | video = VideoFileClip(video_path) 31 | 32 | # Basic video information 33 | info = { 34 | "file_path": video_path, 35 | "filename": os.path.basename(video_path), 36 | "duration": video.duration, 37 | "fps": video.fps, 38 | "size": video.size, # (width, height) 39 | "width": video.w, 40 | "height": video.h, 41 | "aspect_ratio": round(video.w / video.h, 2) if video.h > 0 else None, 42 | } 43 | 44 | # Add reader/codec information if available 45 | if hasattr(video, 'reader') and video.reader: 46 | reader_info = { 47 | "nframes": getattr(video.reader, 'nframes', None), 48 | "bitrate": getattr(video.reader, 'bitrate', None), 49 | "codec": getattr(video.reader, 'codec', None), 50 | "pix_fmt": getattr(video.reader, 'pix_fmt', None), 51 | } 52 | # Only add non-None values 53 | info.update({k: v for k, v in reader_info.items() if v is not None}) 54 | 55 | # Audio information 56 | if video.audio is not None: 57 | audio_info = { 58 | "has_audio": True, 59 | "audio_duration": video.audio.duration, 60 | "audio_fps": video.audio.fps, 61 | "audio_channels": getattr(video.audio, 'nchannels', None), 62 | } 63 | 64 | # Add audio reader info if available 65 | if hasattr(video.audio, 'reader') and video.audio.reader: 66 | audio_reader_info = { 67 | "audio_bitrate": getattr(video.audio.reader, 'bitrate', None), 68 | "audio_codec": getattr(video.audio.reader, 'codec', None), 69 | "sample_rate": getattr(video.audio.reader, 'fps', None), 70 | } 71 | # Only add non-None values 72 | audio_info.update({k: v for k, v in audio_reader_info.items() if v is not None}) 73 | else: 74 | audio_info = { 75 | "has_audio": False, 76 | "audio_duration": None, 77 | "audio_fps": None, 78 | "audio_channels": None, 79 | } 80 | 81 | info.update(audio_info) 82 | 83 | # File size information 84 | try: 85 | file_size = os.path.getsize(video_path) 86 | info["file_size_bytes"] = file_size 87 | info["file_size_mb"] = round(file_size / (1024 * 1024), 2) 88 | except OSError: 89 | info["file_size_bytes"] = None 90 | info["file_size_mb"] = None 91 | 92 | # Calculate video quality metrics 93 | if info["duration"] and info["duration"] > 0: 94 | info["total_frames"] = int(info["fps"] * info["duration"]) if info["fps"] else None 95 | if info.get("file_size_bytes"): 96 | info["average_bitrate_kbps"] = round((info["file_size_bytes"] * 8) / (info["duration"] * 1000), 2) 97 | 98 | # Clean up video object to prevent memory leaks 99 | video.close() 100 | 101 | return { 102 | "success": True, 103 | "video_info": info 104 | } 105 | 106 | except Exception as e: 107 | logger.error(f"Error getting video info for {video_path}: {e}") 108 | return { 109 | "success": False, 110 | "error": str(e), 111 | "error_type": type(e).__name__ 112 | } 113 | finally: 114 | # Ensure video object is cleaned up even if an exception occurs 115 | try: 116 | if 'video' in locals(): 117 | video.close() 118 | except: 119 | pass 120 | 121 | @mcp.tool(description="Use this tool for trimming the video, provide start and end time in seconds, and output name like trimmed_video.mp4 , if there are multiple steps to be done after trimming then make sure to return object and return path should be false else return path should be true") 122 | def trim_video(video_path: str, start_time: float, end_time: float, output_name: str, return_path: bool) -> Dict[str, Any]: 123 | try: 124 | # Input validation 125 | if start_time < 0 or end_time < 0: 126 | return { 127 | "success": False, 128 | "error": "Start and end times must be positive", 129 | "message": "Invalid time parameters" 130 | } 131 | if start_time >= end_time: 132 | return { 133 | "success": False, 134 | "error": "Start time must be less than end time", 135 | "message": "Invalid time range" 136 | } 137 | 138 | output_path = get_output_path(output_name) 139 | video = VideoStore.load(video_path) 140 | trimmed_video = video.subclip(start_time, end_time) 141 | 142 | if return_path: 143 | trimmed_video.write_videofile(output_path) 144 | return { 145 | "success": True, 146 | "output_path": output_path, 147 | "message": "Video trimmed successfully" 148 | } 149 | else: 150 | ref = VideoStore.store(trimmed_video) 151 | return { 152 | "success": True, 153 | "output_object": ref 154 | } 155 | except Exception as e: 156 | logger.error(f"Error trimming video {video_path}: {e}") 157 | return { 158 | "success": False, 159 | "error": str(e), 160 | "error_type": type(e).__name__, 161 | "message": "Error trimming video" 162 | } 163 | 164 | @mcp.tool(description="Use this tool for merging two videos, provide two video paths, and output name like merged_video.mp4 , if there are multiple steps to be done after merging then make sure to return object and return path should be false else return path should be true") 165 | def merge_video(video_path: str, video_path2: str, output_name: str, return_path: bool) -> Dict[str, Any]: 166 | """Merge two videos into one.""" 167 | try: 168 | output_path = get_output_path(output_name) 169 | video1 = VideoStore.load(video_path) 170 | video2 = VideoStore.load(video_path2) 171 | merged_video = concatenate_videoclips([video1, video2]) 172 | if return_path: 173 | merged_video.write_videofile(output_path) 174 | return { 175 | "success": True, 176 | "output_path": output_path, 177 | "message": "Videos merged successfully" 178 | } 179 | else: 180 | ref = VideoStore.store(merged_video) 181 | return { 182 | "success": True, 183 | "output_object": ref, 184 | "message": "Videos merged successfully" 185 | } 186 | except Exception as e: 187 | logger.error(f"Error merging videos {video_path} and {video_path2}: {e}") 188 | return { 189 | "success": False, 190 | "error": str(e), 191 | "error_type": type(e).__name__, 192 | "message": "Error merging videos" 193 | } 194 | 195 | 196 | 197 | @mcp.tool(description="Use this tool for resizing the video make sure first whether video needs to be saved directly or just object has to be returned for further processing, if there are multiple steps to be done after resizing then make sure to return object and return path should be false else return path should be true") 198 | def resize_video(video_path: str, size: Tuple[int, int], output_name: str, return_path: bool) -> Dict[str, Any]: 199 | try: 200 | # Input validation 201 | if not size or len(size) != 2 or size[0] <= 0 or size[1] <= 0: 202 | return { 203 | "success": False, 204 | "error": "Size must be a tuple of two positive integers (width, height)", 205 | "message": "Invalid size parameters" 206 | } 207 | 208 | output_path = get_output_path(output_name) 209 | video = VideoStore.load(video_path) 210 | resized_video = video.resize(newsize=size) 211 | 212 | if return_path: 213 | resized_video.write_videofile(output_path) 214 | return { 215 | "success": True, 216 | "output_path": output_path, 217 | "message": "Video resized successfully" 218 | } 219 | else: 220 | ref = VideoStore.store(resized_video) 221 | return { 222 | "success": True, 223 | "output_object": ref 224 | } 225 | except Exception as e: 226 | logger.error(f"Error resizing video {video_path}: {e}") 227 | return { 228 | "success": False, 229 | "error": str(e), 230 | "error_type": type(e).__name__, 231 | "message": "Error resizing video" 232 | } 233 | 234 | @mcp.tool(description="Use this tool for cropping the video, provide x1, y1, x2, y2 coordinates, and output name like cropped_video.mp4 , if there are multiple steps to be done after cropping then make sure to return object and return path should be false else return path should be true") 235 | def crop_video(video_path: str, x1: int, y1: int, x2: int, y2: int, output_name: str, return_path: bool) -> Dict[str, Any]: 236 | try: 237 | # Input validation 238 | if x1 < 0 or y1 < 0 or x2 <= x1 or y2 <= y1: 239 | return { 240 | "success": False, 241 | "error": "Invalid crop coordinates. x2 > x1 and y2 > y1, all values must be non-negative", 242 | "message": "Invalid crop parameters" 243 | } 244 | 245 | output_path = get_output_path(output_name) 246 | video = VideoStore.load(video_path) 247 | cropped_video = crop(video, x1, y1, x2, y2) 248 | 249 | if return_path: 250 | cropped_video.write_videofile(output_path) 251 | return { 252 | "success": True, 253 | "output_path": output_path, 254 | "message": "Video cropped successfully" 255 | } 256 | else: 257 | ref = VideoStore.store(cropped_video) 258 | return { 259 | "success": True, 260 | "output_object": ref 261 | } 262 | except Exception as e: 263 | logger.error(f"Error cropping video {video_path}: {e}") 264 | return { 265 | "success": False, 266 | "error": str(e), 267 | "error_type": type(e).__name__, 268 | "message": "Error cropping video" 269 | } 270 | 271 | @mcp.tool(description="Use this tool for rotating the video, and make sure to provide output name like rotated_video.mp4 , some_hello.mp4 etc. don't pass path just give meaningful names based on video info, if there are multiple steps to be done after rotating then make sure to return object and return path should be false else return path should be true") 272 | def rotate_video(video_path: str, angle: int, output_name: str, return_path: bool) -> Dict[str, Any]: 273 | try: 274 | # Input validation 275 | if not isinstance(angle, (int, float)): 276 | return { 277 | "success": False, 278 | "error": "Angle must be a number", 279 | "message": "Invalid angle parameter" 280 | } 281 | 282 | output_path = get_output_path(output_name) 283 | video = VideoStore.load(video_path) 284 | rotated_video = rotate(video, angle) 285 | 286 | if return_path: 287 | rotated_video.write_videofile(output_path) 288 | return { 289 | "success": True, 290 | "output_path": output_path, 291 | "message": "Video rotated successfully" 292 | } 293 | else: 294 | ref = VideoStore.store(rotated_video) 295 | return { 296 | "success": True, 297 | "output_object": ref 298 | } 299 | except Exception as e: 300 | logger.error(f"Error rotating video {video_path}: {e}") 301 | return { 302 | "success": False, 303 | "error": str(e), 304 | "error_type": type(e).__name__, 305 | "message": "Error rotating video" 306 | } 307 | 308 | @mcp.tool(description="Use this tool for speed up the video, and make sure to provide output name like speed_up_video.mp4 , some_hello.mp4 etc. don't pass path just give meaningful names based on video info, if there are multiple steps to be done after speed up then make sure to return object and return path should be false else return path should be true") 309 | def speed_up_video(video_path: str, speed: float, output_name: str, return_path: bool) -> Dict[str, Any]: 310 | try: 311 | # Input validation 312 | if speed <= 0: 313 | return { 314 | "success": False, 315 | "error": "Speed must be positive (e.g., 2.0 for 2x speed)", 316 | "message": "Invalid speed parameter" 317 | } 318 | 319 | output_path = get_output_path(output_name) 320 | video = VideoStore.load(video_path) 321 | sped_up_video = speedx(video, speed) 322 | 323 | if return_path: 324 | sped_up_video.write_videofile(output_path) 325 | return { 326 | "success": True, 327 | "output_path": output_path, 328 | "message": "Video speed changed successfully" 329 | } 330 | else: 331 | ref = VideoStore.store(sped_up_video) 332 | return { 333 | "success": True, 334 | "output_object": ref 335 | } 336 | except Exception as e: 337 | logger.error(f"Error changing video speed {video_path}: {e}") 338 | return { 339 | "success": False, 340 | "error": str(e), 341 | "error_type": type(e).__name__, 342 | "message": "Error changing video speed" 343 | } 344 | 345 | @mcp.tool(description="Use this tool for adding audio to the video , and make sure to provide output file name like added_audio.mp4 etc. make sure mp4 extension is provided and name should be meaningful, if there are multiple steps to be done after adding audio then make sure to return object and return path should be false else return path should be true") 346 | def add_audio(video_path: str, audio_path: str, output_name: str, return_path: bool) -> Dict[str, Any]: 347 | try: 348 | output_path = get_output_path(output_name) 349 | video = VideoStore.load(video_path) 350 | audio = AudioStore.load(audio_path) 351 | new_video = video.set_audio(audio) 352 | 353 | if return_path: 354 | new_video.write_videofile(output_path) 355 | return { 356 | "success": True, 357 | "output_path": output_path, 358 | "message": "Audio added successfully" 359 | } 360 | else: 361 | ref = VideoStore.store(new_video) 362 | return { 363 | "success": True, 364 | "output_object": ref 365 | } 366 | except Exception as e: 367 | logger.error(f"Error adding audio to video {video_path}: {e}") 368 | return { 369 | "success": False, 370 | "error": str(e), 371 | "error_type": type(e).__name__, 372 | "message": "Error adding audio to video" 373 | } 374 | 375 | 376 | @mcp.tool(description="Use this tool for adding fade in effect to video, provide fade_duration in seconds, and output name like fadein_video.mp4, if there are multiple steps to be done after adding fade in then make sure to return object and return path should be false else return path should be true") 377 | def fadein_video(video_path: str, fade_duration: float, output_name: str, return_path: bool) -> Dict[str, Any]: 378 | try: 379 | # Input validation 380 | if fade_duration <= 0: 381 | return { 382 | "success": False, 383 | "error": "Fade duration must be positive", 384 | "message": "Invalid fade duration parameter" 385 | } 386 | 387 | output_path = get_output_path(output_name) 388 | video = VideoStore.load(video_path) 389 | faded_video = fadein(video, fade_duration) 390 | 391 | if return_path: 392 | faded_video.write_videofile(output_path) 393 | return { 394 | "success": True, 395 | "output_path": output_path, 396 | "message": "Fade in effect added successfully" 397 | } 398 | else: 399 | ref = VideoStore.store(faded_video) 400 | return { 401 | "success": True, 402 | "output_object": ref 403 | } 404 | except Exception as e: 405 | logger.error(f"Error adding fade in effect to video {video_path}: {e}") 406 | return { 407 | "success": False, 408 | "error": str(e), 409 | "error_type": type(e).__name__, 410 | "message": "Error adding fade in effect" 411 | } 412 | 413 | @mcp.tool(description="Use this tool for adding fade out effect to video, provide fade_duration in seconds, and output name like fadeout_video.mp4, if there are multiple steps to be done after adding fade out then make sure to return object and return path should be false else return path should be true") 414 | def fadeout_video(video_path: str, fade_duration: float, output_name: str, return_path: bool) -> Dict[str, Any]: 415 | try: 416 | # Input validation 417 | if fade_duration <= 0: 418 | return { 419 | "success": False, 420 | "error": "Fade duration must be positive", 421 | "message": "Invalid fade duration parameter" 422 | } 423 | 424 | output_path = get_output_path(output_name) 425 | video = VideoStore.load(video_path) 426 | faded_video = fadeout(video, fade_duration) 427 | 428 | if return_path: 429 | faded_video.write_videofile(output_path) 430 | return { 431 | "success": True, 432 | "output_path": output_path, 433 | "message": "Fade out effect added successfully" 434 | } 435 | else: 436 | ref = VideoStore.store(faded_video) 437 | return { 438 | "success": True, 439 | "output_object": ref 440 | } 441 | except Exception as e: 442 | logger.error(f"Error adding fade out effect to video {video_path}: {e}") 443 | return { 444 | "success": False, 445 | "error": str(e), 446 | "error_type": type(e).__name__, 447 | "message": "Error adding fade out effect" 448 | } 449 | 450 | @mcp.tool(description="Use this tool for adding text overlay to video, provide text, position coordinates (x,y), font_size, color, and output name, if there are multiple steps to be done after adding text overlay then make sure to return object and return path should be false else return path should be true") 451 | def add_text_overlay(video_path: str, text: str, x: int, y: int, font_size: int, color: str, duration: float, output_name: str, return_path: bool, path_of_imagemagick: str) -> Dict[str, Any]: 452 | try: 453 | # Input validation 454 | if not text or not text.strip(): 455 | return { 456 | "success": False, 457 | "error": "Text cannot be empty", 458 | "message": "Invalid text parameter" 459 | } 460 | if font_size <= 0: 461 | return { 462 | "success": False, 463 | "error": "Font size must be positive", 464 | "message": "Invalid font size parameter" 465 | } 466 | if duration <= 0: 467 | return { 468 | "success": False, 469 | "error": "Duration must be positive", 470 | "message": "Invalid duration parameter" 471 | } 472 | 473 | output_path = get_output_path(output_name) 474 | video = VideoStore.load(video_path) 475 | 476 | # Configure ImageMagick 477 | mpy_conf.change_settings({"IMAGEMAGICK_BINARY": path_of_imagemagick}) 478 | 479 | # Create a TextClip with specified parameters 480 | text_clip = TextClip(text, fontsize=font_size, color=color) 481 | 482 | # Set position using the provided x, y coordinates and duration 483 | text_clip = text_clip.set_position((x, y)).set_duration(duration) 484 | 485 | # Overlay text on video 486 | final_video = CompositeVideoClip([video, text_clip]) 487 | 488 | if return_path: 489 | final_video.write_videofile(output_path, fps=final_video.fps) 490 | return { 491 | "success": True, 492 | "output_path": output_path, 493 | "message": "Text overlay added successfully" 494 | } 495 | else: 496 | ref = VideoStore.store(final_video) 497 | return { 498 | "success": True, 499 | "output_object": ref, 500 | "message": "Text overlay added successfully" 501 | } 502 | 503 | except Exception as e: 504 | logger.error(f"Error adding text overlay to video {video_path}: {e}") 505 | return { 506 | "success": False, 507 | "error": str(e), 508 | "error_type": type(e).__name__, 509 | "message": "Error adding text overlay. Make sure ImageMagick is installed and path is correct." 510 | } 511 | 512 | @mcp.tool(description="Use this tool for adding image watermark/overlay to video, provide image_path, position coordinates (x,y), and output name, if there are multiple steps to be done after adding image overlay then make sure to return object and return path should be false else return path should be true") 513 | def add_image_overlay(video_path: str, image_path: str, x: int, y: int, duration: float, output_name: str, return_path: bool) -> Dict[str, Any]: 514 | try: 515 | # Input validation 516 | if duration <= 0: 517 | return { 518 | "success": False, 519 | "error": "Duration must be positive", 520 | "message": "Invalid duration parameter" 521 | } 522 | 523 | output_path = get_output_path(output_name) 524 | video = VideoStore.load(video_path) 525 | logo = ImageClip(image_path).set_duration(duration).set_position((x, y)) 526 | final_video = CompositeVideoClip([video, logo]) 527 | 528 | if return_path: 529 | final_video.write_videofile(output_path) 530 | return { 531 | "success": True, 532 | "output_path": output_path, 533 | "message": "Image overlay added successfully" 534 | } 535 | else: 536 | ref = VideoStore.store(final_video) 537 | return { 538 | "success": True, 539 | "output_object": ref 540 | } 541 | except Exception as e: 542 | logger.error(f"Error adding image overlay to video {video_path}: {e}") 543 | return { 544 | "success": False, 545 | "error": str(e), 546 | "error_type": type(e).__name__, 547 | "message": "Error adding image overlay" 548 | } 549 | 550 | @mcp.tool(description="Use this tool for converting video to grayscale/black and white, provide output name like grayscale_video.mp4, if there are multiple steps to be done after converting to grayscale then make sure to return object and return path should be false else return path should be true") 551 | def grayscale_video(video_path: str, output_name: str, return_path: bool) -> Dict[str, Any]: 552 | try: 553 | output_path = get_output_path(output_name) 554 | video = VideoStore.load(video_path) 555 | gray_video = video.fx(blackwhite) 556 | 557 | if return_path: 558 | gray_video.write_videofile(output_path) 559 | return { 560 | "success": True, 561 | "output_path": output_path, 562 | "message": "Video converted to grayscale successfully" 563 | } 564 | else: 565 | ref = VideoStore.store(gray_video) 566 | return { 567 | "success": True, 568 | "output_object": ref 569 | } 570 | except Exception as e: 571 | logger.error(f"Error converting video to grayscale {video_path}: {e}") 572 | return { 573 | "success": False, 574 | "error": str(e), 575 | "error_type": type(e).__name__, 576 | "message": "Error converting video to grayscale" 577 | } 578 | 579 | @mcp.tool(description="Use this tool for creating video from image sequence, provide folder path with images, fps, and output name, if there are multiple steps to be done after creating video from images then make sure to return object and return path should be false else return path should be true") 580 | def images_to_video(images_folder_path:str, fps:int, output_name:str, return_path:bool) -> Dict[str,Any]: 581 | try: 582 | output_path = get_output_path(output_name) 583 | clip = ImageSequenceClip(images_folder_path, fps=fps) 584 | if return_path: 585 | clip.write_videofile(output_path) 586 | return { 587 | "success": True, 588 | "output_path": output_path, 589 | "message": "Video created from images successfully" 590 | } 591 | else: 592 | ref = VideoStore.store(clip) 593 | return { 594 | "success": True, 595 | "output_object": ref 596 | } 597 | except Exception as e: 598 | logger.error(f"Error creating video from images {images_folder_path}: {e}") 599 | return { 600 | "success": False, 601 | "error": str(e), 602 | "error_type": type(e).__name__, 603 | "message": "Error creating video from images" 604 | } 605 | 606 | @mcp.tool(description="Use this tool for extracting frames from video as images, provide start_time, end_time, and fps for extraction, if there are multiple steps to be done after extracting frames then make sure to return object and return path should be false else return path should be true") 607 | def extract_frames(video_path:str, start_time:float, end_time:float, fps:int, output_folder_name:str, return_path:bool) -> Dict[str,Any]: 608 | try: 609 | video = VideoStore.load(video_path) 610 | subclip = video.subclip(start_time, end_time) 611 | if return_path: 612 | os.makedirs(output_folder_name, exist_ok=True) 613 | for i, frame in enumerate(subclip.iter_frames(fps=fps)): 614 | frame_path = os.path.join(output_folder_name, f"frame_{i:04d}.png") 615 | imageio.imwrite(frame_path, frame) 616 | return { 617 | "success": True, 618 | "output_path": output_folder_name, 619 | "message": "Frames extracted successfully" 620 | } 621 | else: 622 | frames = list(subclip.iter_frames(fps=fps)) 623 | return { 624 | "success": True, 625 | "output_object": frames, 626 | "message": "Frames extracted to memory" 627 | } 628 | except Exception as e: 629 | logger.error(f"Error extracting frames from video {video_path}: {e}") 630 | return { 631 | "success": False, 632 | "error": str(e), 633 | "error_type": type(e).__name__, 634 | "message": "Error extracting frames from video" 635 | } 636 | 637 | @mcp.tool(description="Use this tool for mirroring video horizontally, provide output name like mirrored_video.mp4, if there are multiple steps to be done after mirroring then make sure to return object and return path should be false else return path should be true") 638 | def mirror_video(video_path:str, output_name:str, return_path:bool) -> Dict[str,Any]: 639 | try: 640 | output_path = get_output_path(output_name) 641 | video = VideoStore.load(video_path) 642 | mirrored_video = video.fx(mirror_x) 643 | if return_path: 644 | mirrored_video.write_videofile(output_path) 645 | return { 646 | "success": True, 647 | "output_path": output_path, 648 | "message": "Video mirrored successfully" 649 | } 650 | else: 651 | ref = VideoStore.store(mirrored_video) 652 | return { 653 | "success": True, 654 | "output_object": ref 655 | } 656 | except Exception as e: 657 | logger.error(f"Error mirroring video {video_path}: {e}") 658 | return { 659 | "success": False, 660 | "error": str(e), 661 | "error_type": type(e).__name__, 662 | "message": "Error mirroring video" 663 | } 664 | 665 | @mcp.tool(description="Use this tool for splitting video into multiple parts at specific timestamps, provide list of split times in seconds, if there are multiple steps to be done after splitting then make sure to return object and return path should be false else return path should be true") 666 | def split_video_at_times(video_path:str, split_times:List[float], output_name:str, return_path:bool) -> Dict[str,Any]: 667 | try: 668 | output_path = get_output_path(output_name) 669 | video = VideoStore.load(video_path) 670 | segments = [] 671 | split_times = [0] + split_times + [video.duration] 672 | 673 | for i in range(len(split_times) - 1): 674 | start = split_times[i] 675 | end = split_times[i + 1] 676 | segment = video.subclip(start, end) 677 | segments.append(segment) 678 | 679 | if return_path: 680 | output_paths = [] 681 | for i, segment in enumerate(segments): 682 | segment_path = os.path.join(output_path, f"{output_name}_part_{i+1}.mp4") 683 | segment.write_videofile(segment_path) 684 | output_paths.append(segment_path) 685 | return { 686 | "success": True, 687 | "output_paths": output_paths, 688 | "message": "Video split successfully" 689 | } 690 | else: 691 | refs = [VideoStore.store(segment) for segment in segments] 692 | return { 693 | "success": True, 694 | "output_objects": refs 695 | } 696 | except Exception as e: 697 | logger.error(f"Error splitting video at times {video_path}: {e}") 698 | return { 699 | "success": False, 700 | "error": str(e), 701 | "error_type": type(e).__name__, 702 | "message": "Error splitting video at times" 703 | } 704 | 705 | @mcp.tool(description="Use this tool for converting video format with codec and quality control, provide codec, fps, bitrate, if there are multiple steps to be done after converting video format then make sure to return object and return path should be false else return path should be true") 706 | def convert_video_format(video_path:str, output_name:str, codec:str, fps:Optional[int], bitrate:Optional[str], return_path:bool) -> Dict[str,Any]: 707 | try: 708 | output_path = get_output_path(output_name) 709 | video = VideoStore.load(video_path) 710 | write_kwargs = {"codec": codec} 711 | if fps: 712 | write_kwargs["fps"] = fps 713 | if bitrate: 714 | write_kwargs["bitrate"] = bitrate 715 | 716 | if return_path: 717 | video.write_videofile(output_path, **write_kwargs) 718 | return { 719 | "success": True, 720 | "output_path": output_path, 721 | "message": "Video format converted successfully" 722 | } 723 | else: 724 | ref = VideoStore.store(video) 725 | return { 726 | "success": True, 727 | "output_object": ref 728 | } 729 | except Exception as e: 730 | logger.error(f"Error converting video format {video_path}: {e}") 731 | return { 732 | "success": False, 733 | "error": str(e), 734 | "error_type": type(e).__name__, 735 | "message": "Error converting video format" 736 | } 737 | 738 | @mcp.tool(description="Use this tool for adding video overlay with transparency, provide overlay video, position, and opacity (0-1), if there are multiple steps to be done after adding video overlay then make sure to return object and return path should be false else return path should be true") 739 | def add_video_overlay(base_video_path:str, overlay_video_path:str, x:int, y:int, opacity:float, output_name:str, return_path:bool,duration:float) -> Dict[str,Any]: 740 | try: 741 | output_path = get_output_path(output_name) 742 | base_video = VideoStore.load(base_video_path) 743 | overlay_video = VideoStore.load(overlay_video_path) 744 | 745 | overlay_positioned = overlay_video.set_position((x, y)).set_opacity(opacity).set_duration(duration) 746 | final_video = CompositeVideoClip([base_video, overlay_positioned]) 747 | 748 | if return_path: 749 | final_video.write_videofile(output_path) 750 | return { 751 | "success": True, 752 | "output_path": output_path, 753 | "message": "Video overlay added successfully" 754 | } 755 | else: 756 | ref = VideoStore.store(final_video) 757 | return { 758 | "success": True, 759 | "output_object": ref 760 | } 761 | except Exception as e: 762 | logger.error(f"Error adding video overlay {base_video_path}: {e}") 763 | return { 764 | "success": False, 765 | "error": str(e), 766 | "error_type": type(e).__name__, 767 | "message": "Error adding video overlay" 768 | } 769 | 770 | ```