This is page 1 of 2. Use http://codebase.md/vladimir-tutin/plex-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .env.example
├── .gitignore
├── .vscode
│ └── launch.json
├── modules
│ ├── __init__.py
│ ├── client.py
│ ├── collection.py
│ ├── library.py
│ ├── media.py
│ ├── playlist.py
│ ├── search.py
│ ├── server.py
│ ├── sessions.py
│ └── user.py
├── plex_mcp_server.py
├── README.md
├── requirements.txt
└── watcher.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | .env
2 | __pycache__
3 | /modules/__pycache__
4 | .venv
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
1 | PLEX_URL=https://app.plex.tv
2 | PLEX_TOKEN=TOKEN
3 | PLEX_USERNAME=USERNAME
4 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Plex MCP Server
2 |
3 | A powerful Model-Controller-Protocol server for interacting with Plex Media Server, providing a standardized JSON-based interface for automation, scripting, and integration with other tools.
4 |
5 | ## Overview
6 |
7 | Plex MCP Server creates a unified API layer on top of the Plex Media Server API, offering:
8 |
9 | - **Standardized JSON responses** for compatibility with automation tools, AI systems, and other integrations
10 | - **Multiple transport methods** (stdio and SSE) for flexible integration options
11 | - **Rich command set** for managing libraries, collections, playlists, media, users, and more
12 | - **Error handling** with consistent response formats
13 | - **Easy integration** with automation platforms (like n8n) and custom scripts
14 |
15 | ## Requirements
16 |
17 | - Python 3.8+
18 | - Plex Media Server with valid authentication token
19 | - Access to the Plex server (locally or remotely)
20 |
21 | ## Installation
22 |
23 | 1. Clone this repository
24 | 2. Install the required dependencies:
25 | ```
26 | pip install -r requirements.txt
27 | ```
28 | 3. Create a `.env` file based on the `.env.example`:
29 | ```
30 | cp .env.example .env
31 | ```
32 | 4. Add your Plex server URL and token to the `.env` file:
33 | ```
34 | PLEX_URL=http://your-plex-server:32400
35 | PLEX_TOKEN=your-plex-token
36 | ```
37 |
38 | ## Usage
39 |
40 | The server can be run in two transport modes: stdio (Standard Input/Output) or SSE (Server-Sent Events). Each mode is suitable for different integration scenarios.
41 |
42 | ### Running with stdio Transport
43 |
44 | The stdio transport is ideal for direct integration with applications like Claude Desktop or Cursor. It accepts commands via standard input and outputs results to standard output in JSON format.
45 |
46 | Basic command line usage:
47 | ```bash
48 | python3 -m plex_mcp
49 | ```
50 | or
51 | ```bash
52 | python3 plex_mcp_server.py --transport stdio
53 | ```
54 |
55 | #### Configuration Example for Claude Desktop/Cursor
56 | Add this configuration to your application's settings:
57 | ```json
58 | {
59 | "plex": {
60 | "command": "python",
61 | "args": [
62 | "C://Users//User//Documents//plex-mcp-server//plex_mcp_server.py",
63 | "--transport=stdio"
64 | ],
65 | "env": {
66 | "PLEX_URL":"http://localhost:32400",
67 | "PLEX_TOKEN":"av3khi56h634v3",
68 | "PLEX_USERNAME:"Administrator"
69 | }
70 | }
71 | }
72 | ```
73 |
74 | ### Running with SSE Transport
75 |
76 | The Server-Sent Events (SSE) transport provides a web-based interface for integration with web applications and services.
77 |
78 | Start the server:
79 | ```bash
80 | python3 plex_mcp_server.py --transport sse --host 0.0.0.0 --port 3001
81 | ```
82 |
83 | Default options:
84 | - Host: 0.0.0.0 (accessible from any network interface)
85 | - Port: 3001
86 | - SSE endpoint: `/sse`
87 | - Message endpoint: `/messages/`
88 |
89 | #### Configuration Example for SSE Client
90 | When the server is running in SSE mode, configure your client to connect using:
91 | ```json
92 | {
93 | "plex": {
94 | "url": "http://localhost:3001/sse"
95 | }
96 | }
97 | ```
98 |
99 | With SSE, you can connect to the server via web applications or tools that support SSE connections.
100 |
101 | ## Command Modules
102 |
103 | ### Library Module
104 | - List libraries
105 | - Get library statistics
106 | - Refresh libraries
107 | - Scan for new content
108 | - Get library details
109 | - Get recently added content
110 | - Get library contents
111 |
112 | ### Media Module
113 | - Search for media
114 | - Get detailed media information
115 | - Edit media metadata
116 | - Delete media
117 | - Get and set artwork
118 | - List available artwork
119 |
120 | ### Playlist Module
121 | - List playlists
122 | - Get playlist contents
123 | - Create playlists
124 | - Delete playlists
125 | - Add items to playlists
126 | - Remove items from playlists
127 | - Edit playlists
128 | - Upload custom poster images
129 | - Copy playlists to other users
130 |
131 | ### Collection Module
132 | - List collections
133 | - Create collections
134 | - Add items to collections
135 | - Remove items from collections
136 | - Edit collections
137 |
138 | ### User Module
139 | - Search for users
140 | - Get user information
141 | - Get user's on deck content
142 | - Get user watch history
143 |
144 | ### Sessions Module
145 | - Get active sessions
146 | - Get media playback history
147 |
148 | ### Server Module
149 | - Get Plex server logs
150 | - Get server information
151 | - Get bandwidth statistics
152 | - Get current resource usage
153 | - Get and run butler tasks
154 | - Get server alerts
155 |
156 | ### Client Module
157 | - List clients
158 | - Get client details
159 | - Get client timelines
160 | - Get active clients
161 | - Start media playback
162 | - Control playback (play, pause, etc.)
163 | - Navigate client interfaces
164 | - Set audio/subtitle streams
165 |
166 | **Note:** The Client Module functionality is currently limited and not fully implemented. Some features may not work as expected or may be incomplete.
167 |
168 | ## Response Format
169 |
170 | All commands return standardized JSON responses for maximum compatibility with various tools, automation platforms, and AI systems. This consistent structure makes it easy to process responses programmatically.
171 |
172 | For successful operations, the response typically includes:
173 | ```json
174 | {
175 | "success_field": true,
176 | "relevant_data": "value",
177 | "additional_info": {}
178 | }
179 | ```
180 |
181 | For errors, the response format is:
182 | ```json
183 | {
184 | "error": "Error message describing what went wrong"
185 | }
186 | ```
187 |
188 | For multiple matches (when searching by title), results are returned as an array of objects with identifying information:
189 | ```json
190 | [
191 | {
192 | "title": "Item Title",
193 | "id": 12345,
194 | "type": "movie",
195 | "year": 2023
196 | },
197 | {
198 | "title": "Another Item",
199 | "id": 67890,
200 | "type": "show",
201 | "year": 2022
202 | }
203 | ]
204 | ```
205 |
206 | ## Debugging
207 |
208 | For development and debugging, you can use the included `watcher.py` script which monitors for changes and automatically restarts the server.
209 |
210 | ## License
211 |
212 | [Include your license information here]
213 |
```
--------------------------------------------------------------------------------
/modules/search.py:
--------------------------------------------------------------------------------
```python
1 | import json
2 | from typing import Optional
3 | from modules import mcp, connect_to_plex
4 |
5 |
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
1 | plexapi>=4.15.0
2 | starlette>=0.28.0
3 | uvicorn>=0.23.0
4 | requests>=2.31.0
5 | python-dotenv>=1.0.0
6 | mcp
```
--------------------------------------------------------------------------------
/.vscode/launch.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "configurations": [
3 |
4 | {
5 | "name": "Python Debugger: Python File",
6 | "type": "debugpy",
7 | "request": "launch",
8 | "program": "${file}"
9 | }
10 | ]
11 | }
```
--------------------------------------------------------------------------------
/modules/__init__.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import time
3 | from mcp.server.fastmcp import FastMCP # type: ignore
4 | from plexapi.server import PlexServer # type: ignore
5 | from plexapi.myplex import MyPlexAccount # type: ignore
6 |
7 | # Add dotenv for .env file support
8 | try:
9 | from dotenv import load_dotenv # type: ignore
10 | # Load environment variables from .env file
11 | load_dotenv()
12 | print("Successfully loaded environment variables from .env file")
13 | except ImportError:
14 | print("Warning: python-dotenv not installed. Environment variables won't be loaded from .env file.")
15 | print("Install with: pip install python-dotenv")
16 |
17 | # Initialize FastMCP server
18 | mcp = FastMCP("plex-server")
19 |
20 | # Global variables for Plex connection
21 | plex_url = os.environ.get("PLEX_URL", "")
22 | plex_token = os.environ.get("PLEX_TOKEN", "")
23 | server = None
24 | last_connection_time = 0
25 | CONNECTION_TIMEOUT = 30 # seconds
26 | SESSION_TIMEOUT = 60 * 30 # 30 minutes
27 |
28 | def connect_to_plex() -> PlexServer:
29 | """Connect to Plex server using environment variables or stored credentials.
30 |
31 | Returns a PlexServer instance with automatic reconnection if needed.
32 | """
33 | global server, last_connection_time
34 | current_time = time.time()
35 |
36 | # Check if we have a valid connection
37 | if server is not None:
38 | # If we've connected recently, reuse the connection
39 | if current_time - last_connection_time < SESSION_TIMEOUT:
40 | # Verify the connection is still alive with a simple request
41 | try:
42 | # Simple API call to verify the connection
43 | server.library.sections()
44 | last_connection_time = current_time
45 | return server
46 | except:
47 | # Connection failed, reset and create a new one
48 | server = None
49 |
50 | # Create a new connection
51 | max_retries = 3
52 | retry_delay = 2 # seconds
53 |
54 | for attempt in range(max_retries):
55 | try:
56 | # Try connecting directly with a token
57 | if plex_token:
58 | server = PlexServer(plex_url, plex_token, timeout=CONNECTION_TIMEOUT)
59 | last_connection_time = current_time
60 | return server
61 |
62 | # If no direct connection, try with MyPlex account
63 | username = os.environ.get("PLEX_USERNAME")
64 | password = os.environ.get("PLEX_PASSWORD")
65 | server_name = os.environ.get("PLEX_SERVER_NAME")
66 |
67 | if username and password and server_name:
68 | account = MyPlexAccount(username, password)
69 | # Use the plex_token if available to avoid resource.connect()
70 | # which can be problematic
71 | for resource in account.resources():
72 | if resource.name.lower() == server_name.lower() and resource.provides == 'server':
73 | if resource.connections:
74 | # Try each connection until one works
75 | for connection in resource.connections:
76 | try:
77 | server = PlexServer(connection.uri, account.authenticationToken, timeout=CONNECTION_TIMEOUT)
78 | last_connection_time = current_time
79 | return server
80 | except:
81 | continue
82 |
83 | # If we get here, none of the connection attempts worked
84 | # Fall back to resource.connect() as a last resort
85 | server = account.resource(server_name).connect(timeout=CONNECTION_TIMEOUT)
86 | last_connection_time = current_time
87 | return server
88 |
89 | raise ValueError("Insufficient Plex credentials provided")
90 |
91 | except Exception as e:
92 | if attempt == max_retries - 1: # Last attempt failed
93 | raise ValueError(f"Failed to connect to Plex after {max_retries} attempts: {str(e)}")
94 |
95 | # Wait before retrying
96 | time.sleep(retry_delay)
97 |
98 | # We shouldn't get here but just in case
99 | raise ValueError("Failed to connect to Plex server")
100 |
```
--------------------------------------------------------------------------------
/plex_mcp_server.py:
--------------------------------------------------------------------------------
```python
1 | import argparse
2 | import uvicorn # type: ignore
3 | from starlette.applications import Starlette # type: ignore
4 | from starlette.routing import Mount, Route # type: ignore
5 | from mcp.server import Server # type: ignore
6 | from mcp.server.sse import SseServerTransport # type: ignore
7 | from starlette.requests import Request # type: ignore
8 |
9 | # Import the main mcp instance from modules
10 | from modules import mcp, connect_to_plex
11 |
12 | # Import all tools to ensure they are registered with MCP
13 | # Library module functions
14 | from modules.library import (
15 | library_list,
16 | library_get_stats,
17 | library_refresh,
18 | library_scan,
19 | library_get_details,
20 | library_get_recently_added,
21 | library_get_contents
22 | )
23 | # User module functions
24 | from modules.user import (
25 | user_search_users,
26 | user_get_info,
27 | user_get_on_deck,
28 | user_get_watch_history,
29 | user_get_statistics
30 | )
31 | # Search module functions
32 | from modules.sessions import (
33 | sessions_get_active,
34 | sessions_get_media_playback_history
35 | )
36 | # Server module functions
37 | from modules.server import (
38 | server_get_plex_logs,
39 | server_get_info,
40 | server_get_bandwidth,
41 | server_get_current_resources,
42 | server_get_butler_tasks,
43 | server_get_alerts,
44 | server_run_butler_task
45 | )
46 | # Playlist module functions
47 | from modules.playlist import (
48 | playlist_list,
49 | playlist_get_contents,
50 | playlist_create,
51 | playlist_delete,
52 | playlist_add_to,
53 | playlist_remove_from,
54 | playlist_edit,
55 | playlist_upload_poster,
56 | playlist_copy_to_user
57 | )
58 | # Collection module functions
59 | from modules.collection import (
60 | collection_list,
61 | collection_create,
62 | collection_add_to,
63 | collection_remove_from,
64 | collection_edit
65 | )
66 | # Media module functions
67 | from modules.media import (
68 | media_search,
69 | media_get_details,
70 | media_edit_metadata,
71 | media_delete,
72 | media_get_artwork,
73 | media_set_artwork,
74 | media_list_available_artwork
75 | )
76 | # Client module functions
77 | from modules.client import (
78 | client_list,
79 | client_get_details,
80 | client_get_timelines,
81 | client_get_active,
82 | client_start_playback,
83 | client_control_playback,
84 | client_navigate,
85 | client_set_streams
86 | )
87 |
88 | def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
89 | """Create a Starlette application that can serve the provided mcp server with SSE."""
90 | sse = SseServerTransport("/messages/")
91 |
92 | async def handle_sse(request: Request) -> None:
93 | async with sse.connect_sse(
94 | request.scope,
95 | request.receive,
96 | request._send, # noqa: SLF001
97 | ) as (read_stream, write_stream):
98 | await mcp_server.run(
99 | read_stream,
100 | write_stream,
101 | mcp_server.create_initialization_options(),
102 | )
103 |
104 | return Starlette(
105 | debug=debug,
106 | routes=[
107 | Route("/sse", endpoint=handle_sse),
108 | Mount("/messages/", app=sse.handle_post_message),
109 | ],
110 | )
111 |
112 | if __name__ == "__main__":
113 | # Setup command line arguments
114 | parser = argparse.ArgumentParser(description='Run Plex MCP Server')
115 | parser.add_argument('--transport', choices=['stdio', 'sse'], default='sse',
116 | help='Transport method to use (stdio or sse)')
117 | parser.add_argument('--host', default='0.0.0.0', help='Host to bind to (for SSE)')
118 | parser.add_argument('--port', type=int, default=3001, help='Port to listen on (for SSE)')
119 | parser.add_argument('--debug', action='store_true', help='Enable debug mode')
120 |
121 | args = parser.parse_args()
122 |
123 | # Initialize and run the server
124 | print(f"Starting Plex MCP Server with {args.transport} transport...")
125 | print("Set PLEX_URL and PLEX_TOKEN environment variables for connection")
126 |
127 | if args.transport == 'stdio':
128 | # Run with stdio transport (original method)
129 | mcp.run(transport='stdio')
130 | else:
131 | # Run with SSE transport
132 | mcp_server = mcp._mcp_server # Access the underlying MCP server
133 | starlette_app = create_starlette_app(mcp_server, debug=args.debug)
134 | print(f"Starting SSE server on http://{args.host}:{args.port}")
135 | print("Access the SSE endpoint at /sse")
136 | uvicorn.run(starlette_app, host=args.host, port=args.port)
```
--------------------------------------------------------------------------------
/watcher.py:
--------------------------------------------------------------------------------
```python
1 | import time
2 | import os
3 | import sys
4 | import subprocess
5 | import argparse
6 | from watchdog.observers import Observer
7 | from watchdog.events import FileSystemEventHandler
8 |
9 | # Default paths and configuration
10 | SERVER_PATH = os.getcwd() # Current working directory
11 | MODULES_PATH = os.path.join(SERVER_PATH, "modules") # Modules subdirectory
12 | SERVER_MODULE = "plex_mcp_server" # Correct module name
13 |
14 | class MCPServerHandler(FileSystemEventHandler):
15 | def __init__(self, transport=None, host=None, port=None):
16 | self.process = None
17 | self.transport = transport
18 | self.host = host
19 | self.port = port
20 | self.start_server()
21 |
22 | def start_server(self):
23 | if self.process:
24 | print("Forcefully stopping server...")
25 | try:
26 | # First try SIGTERM
27 | self.process.terminate()
28 |
29 | # Give it a short time to terminate
30 | for _ in range(3):
31 | if self.process.poll() is not None:
32 | break # Process terminated
33 | time.sleep(0.1)
34 |
35 | # If still running, force kill
36 | if self.process.poll() is None:
37 | print("Server still running, killing forcefully...")
38 | self.process.kill()
39 |
40 | # Wait for process to be fully killed
41 | self.process.wait()
42 | except Exception as e:
43 | print(f"Error stopping server: {e}")
44 |
45 | # In case the process is still running, try one more approach (platform specific)
46 | try:
47 | if self.process.poll() is None and hasattr(os, 'killpg'):
48 | import signal
49 | os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
50 | except Exception:
51 | pass
52 |
53 | command = [sys.executable, "-m", SERVER_MODULE]
54 |
55 | # Add command line arguments if provided
56 | if self.transport:
57 | command.extend(["--transport", self.transport])
58 | if self.host:
59 | command.extend(["--host", self.host])
60 | if self.port:
61 | command.extend(["--port", str(self.port)])
62 |
63 | print(f"Starting server with command: {' '.join(command)}")
64 | # Create the process in its own process group so we can kill it and all its children
65 | if hasattr(subprocess, 'CREATE_NEW_PROCESS_GROUP') and sys.platform == 'win32':
66 | # Windows-specific flag
67 | self.process = subprocess.Popen(
68 | command,
69 | cwd=SERVER_PATH,
70 | creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
71 | )
72 | else:
73 | # Unix-based systems
74 | self.process = subprocess.Popen(
75 | command,
76 | cwd=SERVER_PATH,
77 | preexec_fn=os.setsid if hasattr(os, 'setsid') else None
78 | )
79 |
80 | def on_modified(self, event):
81 | if event.src_path.endswith('.py'):
82 | print(f"Change detected in {event.src_path}")
83 | self.start_server()
84 |
85 | if __name__ == "__main__":
86 | # Parse command line arguments
87 | parser = argparse.ArgumentParser(description="Watch for changes in MCP server files and restart the server")
88 | parser.add_argument("--transport", help="Transport type (e.g., http, websocket)")
89 | parser.add_argument("--host", help="Host address to bind to")
90 | parser.add_argument("--port", help="Port to bind to")
91 | args = parser.parse_args()
92 |
93 | # Create event handler with provided arguments
94 | event_handler = MCPServerHandler(
95 | transport=args.transport,
96 | host=args.host,
97 | port=args.port
98 | )
99 |
100 | # Set up observers for both main directory and modules subdirectory
101 | observer = Observer()
102 | observer.schedule(event_handler, SERVER_PATH, recursive=False)
103 |
104 | # Make sure modules directory exists before watching it
105 | if os.path.exists(MODULES_PATH) and os.path.isdir(MODULES_PATH):
106 | observer.schedule(event_handler, MODULES_PATH, recursive=True)
107 | else:
108 | print(f"Warning: Modules directory {MODULES_PATH} not found, only watching main directory")
109 |
110 | observer.start()
111 |
112 | print(f"Watching for changes in {SERVER_PATH} and {MODULES_PATH} (if exists)")
113 |
114 | try:
115 | while True:
116 | time.sleep(1)
117 | except KeyboardInterrupt:
118 | print("Stopping watcher...")
119 | observer.stop()
120 | if event_handler.process:
121 | print("Forcefully stopping server...")
122 | try:
123 | # Try SIGTERM first
124 | event_handler.process.terminate()
125 |
126 | # Give it a short time to terminate
127 | for _ in range(3):
128 | if event_handler.process.poll() is not None:
129 | break
130 | time.sleep(0.1)
131 |
132 | # If still running, force kill
133 | if event_handler.process.poll() is None:
134 | print("Server still running, killing forcefully...")
135 | event_handler.process.kill()
136 |
137 | # Try process group kill as a last resort
138 | if event_handler.process.poll() is None and hasattr(os, 'killpg'):
139 | import signal
140 | os.killpg(os.getpgid(event_handler.process.pid), signal.SIGKILL)
141 | except Exception as e:
142 | print(f"Error while stopping server: {e}")
143 | observer.join()
```
--------------------------------------------------------------------------------
/modules/sessions.py:
--------------------------------------------------------------------------------
```python
1 | import json
2 | from typing import Optional
3 | from modules import mcp, connect_to_plex
4 |
5 | # Functions for sessions and playback
6 | @mcp.tool()
7 | async def sessions_get_active(unused: str = None) -> str:
8 | """Get information about current playback sessions, including IP addresses.
9 |
10 | Args:
11 | unused: Unused parameter to satisfy the function signature
12 | """
13 | try:
14 | plex = connect_to_plex()
15 |
16 | # Get all active sessions
17 | sessions = plex.sessions()
18 |
19 | if not sessions:
20 | return json.dumps({
21 | "status": "success",
22 | "message": "No active sessions found.",
23 | "sessions_count": 0,
24 | "sessions": []
25 | })
26 |
27 | sessions_data = []
28 | transcode_count = 0
29 | direct_play_count = 0
30 | total_bitrate = 0
31 |
32 | for session in enumerate(sessions, 1):
33 | i, session = session
34 | # Basic media information
35 | item_type = getattr(session, 'type', 'unknown')
36 | title = getattr(session, 'title', 'Unknown')
37 |
38 | # Session information
39 | player = getattr(session, 'player', None)
40 | user = getattr(session, 'usernames', ['Unknown User'])[0]
41 |
42 | session_info = {
43 | "session_id": i,
44 | "state": player.state,
45 | "player_name": player.title,
46 | "user": user,
47 | "content_type": item_type,
48 | "player": {},
49 | "progress": {}
50 | }
51 |
52 | # Media-specific information
53 | if item_type == 'episode':
54 | show_title = getattr(session, 'grandparentTitle', 'Unknown Show')
55 | season_num = getattr(session, 'parentIndex', '?')
56 | episode_num = getattr(session, 'index', '?')
57 | session_info["content_description"] = f"{show_title} - S{season_num}E{episode_num} - {title} (TV Episode)"
58 |
59 | elif item_type == 'movie':
60 | year = getattr(session, 'year', '')
61 | session_info["year"] = year
62 | session_info["content_description"] = f"{title} ({year}) (Movie)"
63 |
64 | else:
65 | session_info["content_description"] = f"{title} ({item_type})"
66 |
67 | # Player information
68 | if player:
69 | player_info = {
70 | }
71 |
72 | # Add IP address if available
73 | if hasattr(player, 'address'):
74 | player_info["ip"] = player.address
75 |
76 | # Add platform information if available
77 | if hasattr(player, 'platform'):
78 | player_info["platform"] = player.platform
79 |
80 | # Add product information if available
81 | if hasattr(player, 'product'):
82 | player_info["product"] = player.product
83 |
84 | # Add device information if available
85 | if hasattr(player, 'device'):
86 | player_info["device"] = player.device
87 |
88 | # Add version information if available
89 | if hasattr(player, 'version'):
90 | player_info["version"] = player.version
91 |
92 | session_info["player"] = player_info
93 |
94 | # Add playback information
95 | if hasattr(session, 'viewOffset') and hasattr(session, 'duration'):
96 | progress = (session.viewOffset / session.duration) * 100
97 | seconds_remaining = (session.duration - session.viewOffset) / 1000
98 | minutes_remaining = seconds_remaining / 60
99 |
100 | session_info["progress"] = {
101 | "percent": round(progress, 1),
102 | "minutes_remaining": int(minutes_remaining) if minutes_remaining > 1 else 0
103 | }
104 |
105 | # Add quality information if available
106 | if hasattr(session, 'media') and session.media:
107 | media = session.media[0] if isinstance(session.media, list) and session.media else session.media
108 | media_info = {}
109 |
110 | bitrate = getattr(media, 'bitrate', None)
111 | if bitrate:
112 | media_info["bitrate"] = f"{bitrate} kbps"
113 | # Add to total bitrate
114 | try:
115 | total_bitrate += int(bitrate)
116 | except (TypeError, ValueError):
117 | pass
118 |
119 | resolution = getattr(media, 'videoResolution', None)
120 | if resolution:
121 | media_info["resolution"] = resolution
122 |
123 | if media_info:
124 | session_info["media_info"] = media_info
125 |
126 | # Transcoding information
127 | transcode_session = getattr(session, 'transcodeSessions', None)
128 | if transcode_session:
129 | transcode = transcode_session[0] if isinstance(transcode_session, list) else transcode_session
130 |
131 | transcode_info = {"active": True}
132 |
133 | # Add source vs target information if available
134 | if hasattr(transcode, 'sourceVideoCodec') and hasattr(transcode, 'videoCodec'):
135 | transcode_info["video"] = f"{transcode.sourceVideoCodec} → {transcode.videoCodec}"
136 |
137 | if hasattr(transcode, 'sourceAudioCodec') and hasattr(transcode, 'audioCodec'):
138 | transcode_info["audio"] = f"{transcode.sourceAudioCodec} → {transcode.audioCodec}"
139 |
140 | if hasattr(transcode, 'sourceResolution') and hasattr(transcode, 'width') and hasattr(transcode, 'height'):
141 | transcode_info["resolution"] = f"{transcode.sourceResolution} → {transcode.width}x{transcode.height}"
142 |
143 | session_info["transcoding"] = transcode_info
144 | transcode_count += 1
145 | else:
146 | session_info["transcoding"] = {"active": False, "mode": "Direct Play/Stream"}
147 | direct_play_count += 1
148 |
149 | sessions_data.append(session_info)
150 |
151 | return json.dumps({
152 | "status": "success",
153 | "message": f"Found {len(sessions)} active sessions",
154 | "sessions_count": len(sessions),
155 | "transcode_count": transcode_count,
156 | "direct_play_count": direct_play_count,
157 | "total_bitrate_kbps": total_bitrate,
158 | "sessions": sessions_data
159 | }, indent=2)
160 | except Exception as e:
161 | return json.dumps({
162 | "status": "error",
163 | "message": f"Error getting active sessions: {str(e)}"
164 | })
165 |
166 | @mcp.tool()
167 | async def sessions_get_media_playback_history(media_title: str = None, library_name: str = None, media_id: int = None) -> str:
168 | """Get playback history for a specific media item.
169 |
170 | Args:
171 | media_title: Title of the media to get history for (optional if media_id is provided)
172 | library_name: Optional library name to limit search to
173 | media_id: Plex media ID/rating key to directly fetch the item (optional if media_title is provided)
174 | """
175 | try:
176 | plex = connect_to_plex()
177 |
178 | # Check if we have at least one identifier
179 | if not media_title and not media_id:
180 | return json.dumps({
181 | "status": "error",
182 | "message": "Either media_title or media_id must be provided."
183 | })
184 |
185 | media = None
186 | results = []
187 |
188 | # If media_id is provided, try to fetch the item directly
189 | if media_id:
190 | try:
191 | # fetchItem takes a rating key and returns the media object
192 | media = plex.fetchItem(media_id)
193 | except Exception as e:
194 | return json.dumps({
195 | "status": "error",
196 | "message": f"Media with ID '{media_id}' not found: {str(e)}"
197 | })
198 | # Otherwise search by title
199 | elif media_title:
200 | if library_name:
201 | try:
202 | library = plex.library.section(library_name)
203 | results = library.search(title=media_title)
204 | except Exception:
205 | return json.dumps({
206 | "status": "error",
207 | "message": f"Library '{library_name}' not found."
208 | })
209 | else:
210 | results = plex.search(media_title)
211 |
212 | if not results:
213 | return json.dumps({
214 | "status": "error",
215 | "message": f"No media found matching '{media_title}'."
216 | })
217 |
218 | # If we have multiple results, provide details about each match
219 | if len(results) > 1:
220 | matches = []
221 | for item in results:
222 | item_info = {
223 | "media_id": item.ratingKey,
224 | "type": getattr(item, 'type', 'unknown'),
225 | "title": item.title
226 | }
227 |
228 | # Add type-specific info
229 | if item.type == 'episode':
230 | item_info["show_title"] = getattr(item, 'grandparentTitle', 'Unknown Show')
231 | item_info["season"] = getattr(item, 'parentTitle', 'Unknown Season')
232 | item_info["season_number"] = getattr(item, 'parentIndex', '?')
233 | item_info["episode_number"] = getattr(item, 'index', '?')
234 | item_info["formatted_title"] = f"{item_info['show_title']} - S{item_info['season_number']}E{item_info['episode_number']} - {item.title}"
235 | elif item.type == 'movie':
236 | year = getattr(item, 'year', '')
237 | if year:
238 | item_info["year"] = year
239 | item_info["formatted_title"] = f"{item.title} ({year})" if year else item.title
240 |
241 | matches.append(item_info)
242 |
243 | return json.dumps({
244 | "status": "multiple_matches",
245 | "message": f"Multiple items found with title '{media_title}'. Please specify a library, use a more specific title, or use one of the media_id values below.",
246 | "matches": matches
247 | }, indent=2)
248 |
249 | media = results[0]
250 |
251 | media_type = getattr(media, 'type', 'unknown')
252 |
253 | # Format title differently based on media type
254 | media_info = {
255 | "media_id": media.ratingKey,
256 | "key": media.key
257 | }
258 |
259 | if media_type == 'episode':
260 | show = getattr(media, 'grandparentTitle', 'Unknown Show')
261 | season = getattr(media, 'parentTitle', 'Unknown Season')
262 | formatted_title = f"{show} - {season} - {media.title}"
263 | media_info["show_title"] = show
264 | media_info["season_title"] = season
265 | media_info["episode_title"] = media.title
266 | else:
267 | year = getattr(media, 'year', '')
268 | year_str = f" ({year})" if year else ""
269 | formatted_title = f"{media.title}{year_str}"
270 | media_info["title"] = media.title
271 | if year:
272 | media_info["year"] = year
273 |
274 | media_info["type"] = media_type
275 | media_info["formatted_title"] = formatted_title
276 |
277 | # Get the history using the history() method
278 | try:
279 | history_items = media.history()
280 |
281 | if not history_items:
282 | return json.dumps({
283 | "status": "success",
284 | "message": f"No playback history found for '{formatted_title}'.",
285 | "media": media_info,
286 | "play_count": 0,
287 | "history": []
288 | })
289 |
290 | history_data = []
291 |
292 | for item in history_items:
293 | history_entry = {}
294 |
295 | # Get the username if available
296 | account_id = getattr(item, 'accountID', None)
297 | account_name = "Unknown User"
298 |
299 | # Try to get the account name from the accountID
300 | if account_id:
301 | try:
302 | # This may not work unless we have admin privileges
303 | account = plex.myPlexAccount()
304 | if account.id == account_id:
305 | account_name = account.title
306 | else:
307 | for user in account.users():
308 | if user.id == account_id:
309 | account_name = user.title
310 | break
311 | except:
312 | # If we can't get the account name, just use the ID
313 | account_name = f"User ID: {account_id}"
314 |
315 | history_entry["user"] = account_name
316 |
317 | # Get the timestamp when it was viewed
318 | viewed_at = getattr(item, 'viewedAt', None)
319 | viewed_at_str = viewed_at.strftime("%Y-%m-%d %H:%M") if viewed_at else "Unknown time"
320 | history_entry["viewed_at"] = viewed_at_str
321 |
322 | # Device information if available
323 | device_id = getattr(item, 'deviceID', None)
324 | device_name = "Unknown Device"
325 |
326 | # Try to resolve device name using systemDevice method
327 | if device_id:
328 | try:
329 | device = plex.systemDevice(device_id)
330 | if device and hasattr(device, 'name'):
331 | device_name = device.name
332 | except Exception:
333 | # If we can't resolve the device name, just use the ID
334 | device_name = f"Device ID: {device_id}"
335 |
336 | history_entry["device"] = device_name
337 | history_data.append(history_entry)
338 |
339 | return json.dumps({
340 | "status": "success",
341 | "media": media_info,
342 | "play_count": len(history_items),
343 | "history": history_data
344 | }, indent=2)
345 |
346 | except AttributeError:
347 | # Fallback if history() method is not available
348 | # Get basic view information
349 | view_count = getattr(media, 'viewCount', 0) or 0
350 | last_viewed_at = getattr(media, 'lastViewedAt', None)
351 |
352 | if view_count == 0:
353 | return json.dumps({
354 | "status": "success",
355 | "message": f"No one has watched '{formatted_title}' yet.",
356 | "media": media_info,
357 | "play_count": 0
358 | })
359 |
360 | result = {
361 | "status": "success",
362 | "media": media_info,
363 | "play_count": view_count,
364 | }
365 |
366 | if last_viewed_at:
367 | last_viewed_str = last_viewed_at.strftime("%Y-%m-%d %H:%M") if hasattr(last_viewed_at, 'strftime') else str(last_viewed_at)
368 | result["last_viewed"] = last_viewed_str
369 |
370 | # Add any additional account info if available
371 | account_info = getattr(media, 'viewedBy', [])
372 | if account_info:
373 | result["viewed_by"] = [account.title for account in account_info]
374 |
375 | return json.dumps(result, indent=2)
376 |
377 | except Exception as e:
378 | return json.dumps({
379 | "status": "error",
380 | "message": f"Error getting media playback history: {str(e)}"
381 | })
```
--------------------------------------------------------------------------------
/modules/server.py:
--------------------------------------------------------------------------------
```python
1 | from modules import mcp, connect_to_plex
2 | import os
3 | from typing import Dict, List, Any, Optional
4 | import json
5 | import asyncio
6 | import requests
7 |
8 | @mcp.tool()
9 | async def server_get_plex_logs(num_lines: int = 100, log_type: str = "server") -> str:
10 | """Get Plex server logs.
11 |
12 | Args:
13 | num_lines: Number of log lines to retrieve
14 | log_type: Type of log to retrieve (server, scanner, transcoder, updater)
15 | """
16 | try:
17 | import zipfile
18 | import io
19 | import tempfile
20 | import os
21 | import shutil
22 | import traceback
23 |
24 | plex = connect_to_plex()
25 |
26 | # Map common log type names to the actual file names
27 | log_type_map = {
28 | 'server': 'Plex Media Server.log',
29 | 'scanner': 'Plex Media Scanner.log',
30 | 'transcoder': 'Plex Transcoder.log',
31 | 'updater': 'Plex Update Service.log'
32 | }
33 |
34 | log_file_name = log_type_map.get(log_type.lower(), log_type)
35 |
36 | # Download logs from the Plex server
37 | logs_path_or_data = plex.downloadLogs()
38 |
39 | # Handle zipfile content based on what we received
40 | if isinstance(logs_path_or_data, str) and os.path.exists(logs_path_or_data) and logs_path_or_data.endswith('.zip'):
41 | # We received a path to a zip file
42 | with zipfile.ZipFile(logs_path_or_data, 'r') as zip_ref:
43 | log_content = extract_log_from_zip(zip_ref, log_file_name)
44 |
45 | # Clean up the downloaded zip if desired
46 | try:
47 | os.remove(logs_path_or_data)
48 | except:
49 | pass # Ignore errors in cleanup
50 | else:
51 | # We received the actual data - process in memory
52 | if isinstance(logs_path_or_data, str):
53 | logs_path_or_data = logs_path_or_data.encode('utf-8')
54 |
55 | try:
56 | # Create an in-memory zip file
57 | zip_buffer = io.BytesIO(logs_path_or_data)
58 | with zipfile.ZipFile(zip_buffer, 'r') as zip_ref:
59 | log_content = extract_log_from_zip(zip_ref, log_file_name)
60 | except zipfile.BadZipFile:
61 | return f"Downloaded data is not a valid zip file. First 100 bytes: {logs_path_or_data[:100]}"
62 |
63 | # Extract the last num_lines from the log content
64 | log_lines = log_content.splitlines()
65 | log_lines = log_lines[-num_lines:] if len(log_lines) > num_lines else log_lines
66 |
67 | result = f"Last {len(log_lines)} lines of {log_file_name}:\n\n"
68 | result += '\n'.join(log_lines)
69 |
70 | return result
71 | except Exception as e:
72 | return f"Error getting Plex logs: {str(e)}\n{traceback.format_exc()}"
73 |
74 | def extract_log_from_zip(zip_ref, log_file_name):
75 | """Extract the requested log file content from a zip file object."""
76 | # List all files in the zip
77 | all_files = zip_ref.namelist()
78 |
79 | # Find the requested log file
80 | log_file_path = None
81 | for file in all_files:
82 | if log_file_name.lower() in os.path.basename(file).lower():
83 | log_file_path = file
84 | break
85 |
86 | if not log_file_path:
87 | raise ValueError(f"Could not find log file for type: {log_file_name}. Available files: {', '.join(all_files)}")
88 |
89 | # Read the log file content
90 | with zip_ref.open(log_file_path) as f:
91 | log_content = f.read().decode('utf-8', errors='ignore')
92 |
93 | return log_content
94 |
95 | @mcp.tool()
96 | async def server_get_info() -> str:
97 | """Get detailed information about the Plex server.
98 |
99 | Returns:
100 | Dictionary containing server details including version, platform, etc.
101 | """
102 | try:
103 | plex = connect_to_plex()
104 | server_info = {
105 | "version": plex.version,
106 | "platform": plex.platform,
107 | "platform_version": plex.platformVersion,
108 | "updated_at": str(plex.updatedAt) if hasattr(plex, 'updatedAt') else None,
109 | "server_name": plex.friendlyName,
110 | "machine_identifier": plex.machineIdentifier,
111 | "my_plex_username": plex.myPlexUsername,
112 | "my_plex_mapping_state": plex.myPlexMappingState if hasattr(plex, 'myPlexMappingState') else None,
113 | "certificate": plex.certificate if hasattr(plex, 'certificate') else None,
114 | "sync": plex.sync if hasattr(plex, 'sync') else None,
115 | "transcoder_active_video_sessions": plex.transcoderActiveVideoSessions,
116 | "transcoder_audio": plex.transcoderAudio if hasattr(plex, 'transcoderAudio') else None,
117 | "transcoder_video_bitrates": plex.transcoderVideoBitrates,
118 | "transcoder_video_qualities": plex.transcoderVideoQualities,
119 | "transcoder_video_resolutions": plex.transcoderVideoResolutions,
120 | "streaming_brain_version": plex.streamingBrainVersion if hasattr(plex, 'streamingBrainVersion') else None,
121 | "owner_features": plex.ownerFeatures if hasattr(plex, 'ownerFeatures') else None
122 | }
123 |
124 | # Format server information as JSON
125 | return json.dumps({"status": "success", "data": server_info}, indent=4)
126 | except Exception as e:
127 | return json.dumps({"status": "error", "message": str(e)}, indent=4)
128 |
129 | @mcp.tool()
130 | async def server_get_bandwidth(timespan: str = None, lan: str = None) -> str:
131 | """Get bandwidth statistics from the Plex server.
132 |
133 | Args:
134 | timespan: Time span for bandwidth data (months, weeks, days, hours, seconds)
135 | lan: Filter by local network (true/false)
136 |
137 | Returns:
138 | Dictionary containing bandwidth statistics
139 | """
140 | try:
141 | plex = connect_to_plex()
142 |
143 | # Get bandwidth information
144 | bandwidth_stats = []
145 |
146 | if hasattr(plex, 'bandwidth'):
147 | # Prepare kwargs for bandwidth() call
148 | kwargs = {}
149 |
150 | # Add timespan if provided
151 | if timespan:
152 | valid_timespans = ['months', 'weeks', 'days', 'hours', 'seconds']
153 | if timespan.lower() in valid_timespans:
154 | kwargs['timespan'] = timespan.lower()
155 |
156 | # Add lan filter if provided
157 | if lan is not None:
158 | if lan.lower() == 'true':
159 | kwargs['lan'] = True
160 | elif lan.lower() == 'false':
161 | kwargs['lan'] = False
162 |
163 | # Call bandwidth with the constructed kwargs
164 | bandwidth_data = plex.bandwidth(**kwargs)
165 |
166 | for bandwidth in bandwidth_data:
167 | # Each bandwidth object has properties like accountID, at, bytes, deviceID, lan, timespan
168 | stats = {
169 | "account": bandwidth.account().name if bandwidth.account() and hasattr(bandwidth.account(), 'name') else None,
170 | "device_id": bandwidth.deviceID if hasattr(bandwidth, 'deviceID') else None,
171 | "device_name": bandwidth.device().name if bandwidth.device() and hasattr(bandwidth.device(), 'name') else None,
172 | "platform": bandwidth.device().platform if bandwidth.device() and hasattr(bandwidth.device(), 'platform') else None,
173 | "client_identifier": bandwidth.device().clientIdentifier if bandwidth.device() and hasattr(bandwidth.device(), 'clientIdentifier') else None,
174 | "at": str(bandwidth.at) if hasattr(bandwidth, 'at') else None,
175 | "bytes": bandwidth.bytes if hasattr(bandwidth, 'bytes') else None,
176 | "is_local": bandwidth.lan if hasattr(bandwidth, 'lan') else None,
177 | "timespan (seconds)": bandwidth.timespan if hasattr(bandwidth, 'timespan') else None
178 | }
179 | bandwidth_stats.append(stats)
180 |
181 | # Format bandwidth information as JSON
182 | return json.dumps({"status": "success", "data": bandwidth_stats}, indent=4)
183 | except Exception as e:
184 | return json.dumps({"status": "error", "message": str(e)}, indent=4)
185 |
186 | @mcp.tool()
187 | async def server_get_current_resources() -> str:
188 | """Get resource usage information from the Plex server.
189 |
190 | Returns:
191 | Dictionary containing resource usage statistics
192 | """
193 | try:
194 | plex = connect_to_plex()
195 |
196 | # Get resource information
197 | resources_data = []
198 |
199 | if hasattr(plex, 'resources'):
200 | server_resources = plex.resources()
201 |
202 | for resource in server_resources:
203 | # Create an entry for each resource timepoint
204 | resource_entry = {
205 | "timestamp": str(resource.at) if hasattr(resource, 'at') else None,
206 | "host_cpu_utilization": resource.hostCpuUtilization if hasattr(resource, 'hostCpuUtilization') else None,
207 | "host_memory_utilization": resource.hostMemoryUtilization if hasattr(resource, 'hostMemoryUtilization') else None,
208 | "process_cpu_utilization": resource.processCpuUtilization if hasattr(resource, 'processCpuUtilization') else None,
209 | "process_memory_utilization": resource.processMemoryUtilization if hasattr(resource, 'processMemoryUtilization') else None,
210 | "timespan": resource.timespan if hasattr(resource, 'timespan') else None
211 | }
212 | resources_data.append(resource_entry)
213 |
214 | # Format resource information as JSON
215 | return json.dumps({"status": "success", "data": resources_data}, indent=4)
216 | except Exception as e:
217 | return json.dumps({"status": "error", "message": str(e)}, indent=4)
218 |
219 | @mcp.tool()
220 | async def server_get_butler_tasks() -> str:
221 | """Get information about Plex Butler tasks.
222 |
223 | Returns:
224 | Dictionary containing information about scheduled and running butler tasks
225 | """
226 | try:
227 | plex = connect_to_plex()
228 |
229 | # Get the base URL and token from the Plex connection
230 | base_url = plex._baseurl
231 | token = plex._token
232 |
233 | # Make a direct API call to the butler endpoint
234 | url = f"{base_url}/butler"
235 | headers = {'X-Plex-Token': token, 'Accept': 'application/xml'}
236 |
237 | # Disable SSL verification if using https
238 | verify = False if base_url.startswith('https') else True
239 |
240 | response = requests.get(url, headers=headers, verify=verify)
241 |
242 | if response.status_code == 200:
243 | # Parse the XML response
244 | import xml.etree.ElementTree as ET
245 | from xml.dom import minidom
246 |
247 | try:
248 | # Try to parse as XML first
249 | root = ET.fromstring(response.text)
250 |
251 | # Extract butler tasks
252 | butler_tasks = []
253 | for task_elem in root.findall('.//ButlerTask'):
254 | task = {}
255 | for attr, value in task_elem.attrib.items():
256 | # Convert boolean attributes
257 | if value.lower() in ['true', 'false']:
258 | task[attr] = value.lower() == 'true'
259 | # Convert numeric attributes
260 | elif value.isdigit():
261 | task[attr] = int(value)
262 | else:
263 | task[attr] = value
264 | butler_tasks.append(task)
265 |
266 | # Return the butler tasks directly in the data field
267 | return json.dumps({"status": "success", "data": butler_tasks}, indent=4)
268 | except ET.ParseError:
269 | # Return the raw response if XML parsing fails
270 | return json.dumps({
271 | "status": "error",
272 | "message": "Failed to parse XML response",
273 | "raw_response": response.text
274 | }, indent=4)
275 | else:
276 | return json.dumps({
277 | "status": "error",
278 | "message": f"Failed to fetch butler tasks. Status code: {response.status_code}",
279 | "response": response.text
280 | }, indent=4)
281 |
282 | except Exception as e:
283 | import traceback
284 | return json.dumps({
285 | "status": "error",
286 | "message": str(e),
287 | "traceback": traceback.format_exc()
288 | }, indent=4)
289 |
290 | @mcp.tool()
291 | async def server_get_alerts(timeout: int = 15) -> str:
292 | """Get real-time alerts from the Plex server by listening on a websocket.
293 |
294 | Args:
295 | timeout: Number of seconds to listen for alerts (default: 15)
296 |
297 | Returns:
298 | Dictionary containing server alerts and their details
299 | """
300 | try:
301 | plex = connect_to_plex()
302 |
303 | # Collection for alerts
304 | alerts_data = []
305 |
306 | # Define callback function to process alerts
307 | def alert_callback(data):
308 | # Print the raw data to help with debugging
309 | print(f"Raw alert data received: {data}")
310 |
311 | try:
312 | # Extract alert information from the raw notification data
313 | # Assuming data is a list/tuple with at least 3 elements as indicated by the log statement
314 | # Format is likely [type, title, description] or similar
315 | alert_type = data[0] if len(data) > 0 else "Unknown"
316 | alert_title = data[1] if len(data) > 1 else "Unknown"
317 | alert_description = data[2] if len(data) > 2 else "No description"
318 |
319 | # Create a simplified single-line text representation of the alert
320 | alert_text = f"ALERT: {alert_type} - {alert_title} - {alert_description}"
321 |
322 | # Print to console in real-time
323 | print(alert_text)
324 |
325 | # Store alert info for JSON response
326 | alert_info = {
327 | "type": alert_type,
328 | "title": alert_title,
329 | "description": alert_description,
330 | "text": alert_text,
331 | "raw_data": data # Include the raw data for complete information
332 | }
333 | alerts_data.append(alert_info)
334 | except Exception as e:
335 | print(f"Error processing alert data: {e}")
336 | # Still try to store some information even if processing fails
337 | alerts_data.append({
338 | "error": str(e),
339 | "raw_data": str(data)
340 | })
341 |
342 | print(f"Starting alert listener for {timeout} seconds...")
343 |
344 | # Start the alert listener
345 | listener = plex.startAlertListener(alert_callback)
346 |
347 | # Wait for the specified timeout period
348 | await asyncio.sleep(timeout)
349 |
350 | # Stop the listener
351 | listener.stop()
352 | print(f"Alert listener stopped after {timeout} seconds.")
353 |
354 | # Format alerts as JSON
355 | return json.dumps({"status": "success", "data": alerts_data}, indent=4)
356 | except Exception as e:
357 | return json.dumps({"status": "error", "message": str(e)}, indent=4)
358 |
359 | @mcp.tool()
360 | async def server_run_butler_task(task_name: str) -> str:
361 | """Manually run a specific Plex Butler task now.
362 |
363 | Args:
364 | task_name: Name of the butler task to run
365 |
366 | Returns:
367 | Success or error message
368 | """
369 | try:
370 | plex = connect_to_plex()
371 |
372 | # Call the runButlerTask method directly on the PlexServer object
373 | # Valid task names: 'BackupDatabase', 'CheckForUpdates', 'CleanOldBundles',
374 | # 'DeepMediaAnalysis', 'GarbageCollection', 'GenerateAutoTags',
375 | # 'OptimizeDatabase', 'RefreshLocalMedia', 'RefreshPeriodicMetadata',
376 | # 'RefreshLibraries', 'UpgradeMediaAnalysis'
377 |
378 | # Make a direct API call to run the butler task
379 | base_url = plex._baseurl
380 | token = plex._token
381 |
382 | # Use the correct URL structure: /butler/{taskName}
383 | url = f"{base_url}/butler/{task_name}"
384 | headers = {'X-Plex-Token': token}
385 |
386 | # Disable SSL verification if using https
387 | verify = False if base_url.startswith('https') else True
388 |
389 | print(f"Running butler task: {task_name}")
390 | response = requests.post(url, headers=headers, verify=verify)
391 |
392 | print(f"Response status: {response.status_code}")
393 | print(f"Response text: {response.text}")
394 |
395 | # Add 202 Accepted to the list of successful status codes
396 | if response.status_code in [200, 201, 202, 204]:
397 | return json.dumps({"status": "success", "message": f"Butler task '{task_name}' started successfully"}, indent=4)
398 | else:
399 | # For error responses, extract the status code and response text in a more readable format
400 | error_message = f"Failed to run butler task. Status code: {response.status_code}"
401 |
402 | # Try to extract a cleaner error message from the HTML response if possible
403 | if "<html>" in response.text:
404 | import re
405 | # Try to extract the status message from an HTML response (like "404 Not Found")
406 | title_match = re.search(r'<title>(.*?)</title>', response.text)
407 | if title_match and title_match.group(1):
408 | error_message = f"Failed to run butler task: {title_match.group(1)}"
409 |
410 | # Or try to extract from an h1 tag
411 | h1_match = re.search(r'<h1>(.*?)</h1>', response.text)
412 | if h1_match and h1_match.group(1):
413 | error_message = f"Failed to run butler task: {h1_match.group(1)}"
414 |
415 | return json.dumps({
416 | "status": "error",
417 | "message": error_message
418 | }, indent=4)
419 |
420 | except Exception as e:
421 | import traceback
422 | return json.dumps({
423 | "status": "error",
424 | "message": str(e),
425 | "traceback": traceback.format_exc()
426 | }, indent=4)
```
--------------------------------------------------------------------------------
/modules/user.py:
--------------------------------------------------------------------------------
```python
1 | from modules import mcp, connect_to_plex
2 | from plexapi.server import PlexServer # type: ignore
3 | import os
4 | import json
5 | import time
6 | import requests
7 | from datetime import datetime, timedelta
8 | from typing import Dict, List, Optional, Any, Union
9 |
10 | try:
11 | from dotenv import load_dotenv # type: ignore
12 | # Load environment variables from .env file
13 | load_dotenv()
14 | PLEX_USERNAME = os.environ.get("PLEX_USERNAME", None)
15 | print("Successfully loaded environment variables from .env file")
16 | except ImportError:
17 | print("Warning: python-dotenv not installed. Environment variables won't be loaded from .env file.")
18 | print("Install with: pip install python-dotenv")
19 |
20 | @mcp.tool()
21 | async def user_search_users(search_term: str = None) -> str:
22 | """Search for users with names, usernames, or emails containing the search term, or list all users if no search term is provided.
23 |
24 | Args:
25 | search_term: Optional term to search for in user information
26 | """
27 | try:
28 | plex = connect_to_plex()
29 |
30 | # Get account associated with the token
31 | account = plex.myPlexAccount()
32 |
33 | # Get list of all friends (shared users)
34 | all_users = account.users()
35 |
36 | # Add the owner's account to be searched as well
37 | all_users.append(account)
38 |
39 | if search_term:
40 | # Search for users that match the search term
41 | found_users = []
42 | for user in all_users:
43 | username = user.username.lower() if hasattr(user, 'username') else ''
44 | email = user.email.lower() if hasattr(user, 'email') else ''
45 | title = user.title.lower() if hasattr(user, 'title') else ''
46 |
47 | if (search_term.lower() in username or
48 | search_term.lower() in email or
49 | search_term.lower() in title):
50 | found_users.append(user)
51 |
52 | if not found_users:
53 | return json.dumps({"message": f"No users found matching '{search_term}'."})
54 |
55 | # Format the output for found users
56 | result = {
57 | "searchTerm": search_term,
58 | "usersFound": len(found_users),
59 | "users": []
60 | }
61 |
62 | for user in found_users:
63 | is_owner = (user.username == account.username)
64 | user_data = {
65 | "role": "Owner" if is_owner else "Shared User",
66 | "username": user.username,
67 | "email": user.email if hasattr(user, 'email') else None,
68 | "title": user.title if hasattr(user, 'title') else user.username
69 | }
70 |
71 | # Add servers this user has access to (for shared users)
72 | if not is_owner and hasattr(user, 'servers'):
73 | sections = []
74 | for server in user.servers:
75 | if server.name == account.title or server.name == account.username:
76 | for section in server.sections():
77 | sections.append(section.title)
78 |
79 | user_data["libraries"] = sections if sections else []
80 |
81 | result["users"].append(user_data)
82 |
83 | return json.dumps(result)
84 | else:
85 | # List all users
86 | if not all_users:
87 | return json.dumps({"message": "No shared users found. Only your account has access to this Plex server."})
88 |
89 | # Format the output for all users
90 | result = {
91 | "totalUsers": len(all_users),
92 | "owner": {
93 | "username": account.username,
94 | "email": account.email,
95 | "title": account.title
96 | },
97 | "sharedUsers": []
98 | }
99 |
100 | # Add all the shared users
101 | for user in all_users:
102 | if user.username != account.username:
103 | result["sharedUsers"].append({
104 | "username": user.username,
105 | "email": user.email if hasattr(user, 'email') else None,
106 | "title": user.title if hasattr(user, 'title') else user.username
107 | })
108 |
109 | return json.dumps(result)
110 | except Exception as e:
111 | return json.dumps({"error": f"Error searching users: {str(e)}"})
112 |
113 | @mcp.tool()
114 | async def user_get_info(username: str = PLEX_USERNAME) -> str:
115 | """Get detailed information about a specific Plex user.
116 |
117 | Args:
118 | username: Optional. Name of the user to get information for. Defaults to PLEX_USERNAME in .env
119 | """
120 | try:
121 | plex = connect_to_plex()
122 |
123 | # Get account associated with the token
124 | account = plex.myPlexAccount()
125 |
126 | # Check if the username is the owner
127 | if username == account.username:
128 | result = {
129 | "role": "Owner",
130 | "username": account.username,
131 | "email": account.email,
132 | "title": account.title,
133 | "uuid": account.uuid,
134 | "authToken": f"{account.authenticationToken[:5]}...{account.authenticationToken[-5:]} (truncated for security)",
135 | "subscription": {
136 | "active": account.subscriptionActive
137 | }
138 | }
139 |
140 | if account.subscriptionActive:
141 | result["subscription"]["features"] = account.subscriptionFeatures
142 |
143 | result["joinedAt"] = str(account.joinedAt)
144 |
145 | return json.dumps(result)
146 |
147 | # Search for the user in the friends list
148 | target_user = None
149 | for user in account.users():
150 | if user.username == username:
151 | target_user = user
152 | break
153 |
154 | if not target_user:
155 | return json.dumps({"error": f"User '{username}' not found among shared users."})
156 |
157 | # Format the output
158 | result = {
159 | "role": "Shared User",
160 | "username": target_user.username,
161 | "email": target_user.email if hasattr(target_user, 'email') else None,
162 | "title": target_user.title if hasattr(target_user, 'title') else target_user.username,
163 | "id": target_user.id if hasattr(target_user, 'id') else None
164 | }
165 |
166 | # Add servers and sections this user has access to
167 | if hasattr(target_user, 'servers'):
168 | result["serverAccess"] = []
169 | for server in target_user.servers:
170 | if server.name == account.title or server.name == account.username:
171 | server_data = {
172 | "name": server.name,
173 | "libraries": []
174 | }
175 | for section in server.sections():
176 | server_data["libraries"].append(section.title)
177 | result["serverAccess"].append(server_data)
178 |
179 | # Get user's devices if available
180 | if hasattr(target_user, 'devices') and callable(getattr(target_user, 'devices')):
181 | try:
182 | devices = target_user.devices()
183 | if devices:
184 | result["devices"] = []
185 | for device in devices:
186 | device_data = {
187 | "name": device.name,
188 | "platform": device.platform
189 | }
190 | if hasattr(device, 'clientIdentifier'):
191 | device_data["clientId"] = device.clientIdentifier
192 | if hasattr(device, 'createdAt'):
193 | device_data["createdAt"] = str(device.createdAt)
194 | if hasattr(device, 'lastSeenAt'):
195 | device_data["lastSeenAt"] = str(device.lastSeenAt)
196 | result["devices"].append(device_data)
197 | except:
198 | result["devices"] = None
199 |
200 | return json.dumps(result)
201 | except Exception as e:
202 | return json.dumps({"error": f"Error getting user info: {str(e)}"})
203 |
204 | @mcp.tool()
205 | async def user_get_on_deck(username: str = PLEX_USERNAME) -> str:
206 | """Get on deck (in progress) media for a specific user.
207 |
208 | Args:
209 | username: Name of the user to get on-deck items for
210 | """
211 | try:
212 | plex = connect_to_plex()
213 |
214 | # Try to switch to the user account to get their specific on-deck items
215 | if username.lower() == plex.myPlexAccount().username.lower():
216 | # This is the main account, use server directly
217 | on_deck_items = plex.library.onDeck()
218 | else:
219 | # For a different user, we need to get access to their account
220 | try:
221 | account = plex.myPlexAccount()
222 |
223 | # Find the user in the shared users
224 | target_user = None
225 | for user in account.users():
226 | if user.username.lower() == username.lower() or user.title.lower() == username.lower():
227 | target_user = user
228 | break
229 |
230 | if not target_user:
231 | return json.dumps({"error": f"User '{username}' not found."})
232 |
233 | # For a shared user, try to switch to that user and get their on-deck items
234 | # This requires admin privileges and may be limited by Plex server's capabilities
235 | user_token = target_user.get_token(plex.machineIdentifier)
236 | if not user_token:
237 | return json.dumps({"error": f"Unable to access on-deck items for user '{username}'. Token not available."})
238 |
239 | user_plex = PlexServer(plex._baseurl, user_token)
240 | on_deck_items = user_plex.library.onDeck()
241 | except Exception as user_err:
242 | return json.dumps({"error": f"Error accessing user '{username}': {str(user_err)}"})
243 |
244 | if not on_deck_items:
245 | return json.dumps({"message": f"No on-deck items found for user '{username}'."})
246 |
247 | result = {
248 | "username": username,
249 | "count": len(on_deck_items),
250 | "items": []
251 | }
252 |
253 | for item in on_deck_items:
254 | media_type = getattr(item, 'type', 'unknown')
255 | title = getattr(item, 'title', 'Unknown Title')
256 |
257 | item_data = {
258 | "type": media_type,
259 | "title": title
260 | }
261 |
262 | if media_type == 'episode':
263 | item_data["show"] = getattr(item, 'grandparentTitle', 'Unknown Show')
264 | item_data["season"] = getattr(item, 'parentTitle', 'Unknown Season')
265 | else:
266 | item_data["year"] = getattr(item, 'year', '')
267 |
268 | # Add progress information
269 | if hasattr(item, 'viewOffset') and hasattr(item, 'duration'):
270 | progress_pct = (item.viewOffset / item.duration) * 100
271 |
272 | # Format as minutes:seconds
273 | total_mins = item.duration // 60000
274 | current_mins = item.viewOffset // 60000
275 | total_secs = (item.duration % 60000) // 1000
276 | current_secs = (item.viewOffset % 60000) // 1000
277 |
278 | # Set progress as a single percentage value
279 | item_data["progress"] = round(progress_pct, 1)
280 |
281 | # Add time info as separate fields
282 | item_data["current_time"] = f"{current_mins}:{current_secs:02d}"
283 | item_data["total_time"] = f"{total_mins}:{total_secs:02d}"
284 |
285 | result["items"].append(item_data)
286 |
287 | return json.dumps(result)
288 | except Exception as e:
289 | return json.dumps({"error": f"Error getting on-deck items: {str(e)}"})
290 |
291 | @mcp.tool()
292 | async def user_get_watch_history(username: str = PLEX_USERNAME, limit: int = 10, content_type: str = None) -> str:
293 | """Get recent watch history for a specific user.
294 |
295 | Args:
296 | username: Name of the user to get watch history for
297 | limit: Maximum number of recently watched items to show
298 | content_type: Optional filter for content type (movie, show, episode, etc)
299 | """
300 | try:
301 | plex = connect_to_plex()
302 | account = plex.myPlexAccount()
303 |
304 | # Track items we've already seen to avoid duplicates when expanding search
305 | seen_item_ids = set()
306 | filtered_items = []
307 | current_search_limit = limit * 2 # Start with 2x the requested limit
308 | max_attempts = 4 # Maximum number of search expansions to prevent infinite loops
309 | attempt = 0
310 |
311 | while len(filtered_items) < limit and attempt < max_attempts:
312 | attempt += 1
313 |
314 | # For the main account owner
315 | if username.lower() == account.username.lower():
316 | history_items = plex.history(maxresults=current_search_limit)
317 | else:
318 | # For a different user, find them in shared users
319 | target_user = None
320 | for user in account.users():
321 | if user.username.lower() == username.lower() or user.title.lower() == username.lower():
322 | target_user = user
323 | break
324 |
325 | if not target_user:
326 | return json.dumps({"error": f"User '{username}' not found."})
327 |
328 | # For a shared user, use accountID to filter history
329 | history_items = plex.history(maxresults=current_search_limit, accountID=target_user.id)
330 |
331 | # Filter by content type and deduplicate
332 | for item in history_items:
333 | item_id = getattr(item, 'ratingKey', None)
334 |
335 | # Skip if we've already processed this item
336 | if item_id and item_id in seen_item_ids:
337 | continue
338 |
339 | # Add to seen items
340 | if item_id:
341 | seen_item_ids.add(item_id)
342 |
343 | # Apply content type filter if specified
344 | item_type = getattr(item, 'type', 'unknown')
345 | if content_type and item_type.lower() != content_type.lower():
346 | continue
347 |
348 | filtered_items.append(item)
349 |
350 | # Stop if we've reached the limit
351 | if len(filtered_items) >= limit:
352 | break
353 |
354 | # If we still need more items, double the search limit for next attempt
355 | if len(filtered_items) < limit and history_items:
356 | current_search_limit *= 2
357 | else:
358 | # Either we have enough items or there are no more to fetch
359 | break
360 |
361 | # If we couldn't find any matching items
362 | if not filtered_items:
363 | message = f"No watch history found for user '{username}'"
364 | if content_type:
365 | message += f" with content type '{content_type}'"
366 | return json.dumps({"message": message})
367 |
368 | # Format the results
369 | result = {
370 | "username": username,
371 | "count": len(filtered_items),
372 | "requestedLimit": limit,
373 | "contentType": content_type,
374 | "items": []
375 | }
376 |
377 | # Add only the requested limit number of items
378 | for item in filtered_items[:limit]:
379 | media_type = getattr(item, 'type', 'unknown')
380 | title = getattr(item, 'title', 'Unknown Title')
381 |
382 | item_data = {
383 | "type": media_type,
384 | "title": title,
385 | "ratingKey": getattr(item, 'ratingKey', None)
386 | }
387 |
388 | # Format based on media type
389 | if media_type == 'episode':
390 | item_data["show"] = getattr(item, 'grandparentTitle', 'Unknown Show')
391 | item_data["season"] = getattr(item, 'parentTitle', 'Unknown Season')
392 | item_data["episodeNumber"] = getattr(item, 'index', None)
393 | item_data["seasonNumber"] = getattr(item, 'parentIndex', None)
394 | else:
395 | item_data["year"] = getattr(item, 'year', '')
396 |
397 | # Add viewed date if available
398 | if hasattr(item, 'viewedAt') and item.viewedAt:
399 | item_data["viewedAt"] = item.viewedAt.strftime("%Y-%m-%d %H:%M")
400 |
401 | result["items"].append(item_data)
402 |
403 | return json.dumps(result)
404 | except Exception as e:
405 | return json.dumps({"error": f"Error getting watch history: {str(e)}"})
406 |
407 | @mcp.tool()
408 | async def user_get_statistics(time_period: str = "last_24_hours", username: str = None) -> str:
409 | """Get statistics about user watch activity over different time periods.
410 |
411 | Args:
412 | time_period: Time period for statistics - options: "last_24_hours", "last_7_days", "last_30_days", "last_90_days", "last_year", "all_time"
413 | username: Optional. Filter statistics for a specific user. If not provided, returns statistics for all users.
414 | """
415 | try:
416 | plex = connect_to_plex()
417 | base_url = plex._baseurl
418 | token = plex._token
419 |
420 | # Get the current epoch time
421 | current_time = int(time.time())
422 |
423 | # Map time_period to Plex API parameters
424 | time_mapping = {
425 | "last_24_hours": {"timespan": 4, "at": current_time - 24*60*60},
426 | "last_7_days": {"timespan": 3, "at": current_time - 7*24*60*60},
427 | "last_30_days": {"timespan": 2, "at": current_time - 30*24*60*60},
428 | "last_90_days": {"timespan": 2, "at": current_time - 90*24*60*60},
429 | "last_year": {"timespan": 1, "at": current_time - 365*24*60*60},
430 | "all_time": {"timespan": 1, "at": 0}
431 | }
432 |
433 | if time_period not in time_mapping:
434 | return json.dumps({"error": f"Invalid time period. Choose from: {', '.join(time_mapping.keys())}"})
435 |
436 | # Build the statistics URL
437 | params = time_mapping[time_period]
438 | stats_url = f"{base_url}/statistics/media?timespan={params['timespan']}&at>={params['at']}"
439 |
440 | # Add Plex headers
441 | headers = {
442 | 'X-Plex-Token': token,
443 | 'Accept': 'application/json'
444 | }
445 |
446 | # Make the request to get statistics
447 | response = requests.get(stats_url, headers=headers)
448 | if response.status_code != 200:
449 | return json.dumps({"error": f"Failed to fetch statistics: HTTP {response.status_code}"})
450 |
451 | data = response.json()
452 |
453 | # Get data from response
454 | container = data.get('MediaContainer', {})
455 | device_list = container.get('Device', [])
456 | account_list = container.get('Account', [])
457 | stats_list = container.get('StatisticsMedia', [])
458 |
459 | # Create lookup dictionaries for accounts and devices
460 | account_lookup: Dict[int, Dict[str, Any]] = {}
461 | for account in account_list:
462 | account_lookup[account.get('id')] = {
463 | 'name': account.get('name'),
464 | 'key': account.get('key'),
465 | 'thumb': account.get('thumb')
466 | }
467 |
468 | device_lookup: Dict[int, Dict[str, Any]] = {}
469 | for device in device_list:
470 | device_lookup[device.get('id')] = {
471 | 'name': device.get('name'),
472 | 'platform': device.get('platform'),
473 | 'clientIdentifier': device.get('clientIdentifier')
474 | }
475 |
476 | # Filter by username if specified
477 | target_account_id = None
478 | if username:
479 | # Get the account ID for the specified username
480 | account = plex.myPlexAccount()
481 |
482 | # Check if the username matches the owner
483 | if username.lower() == account.username.lower():
484 | # Find the owner's account ID in the account list
485 | for acc in account_list:
486 | if acc.get('name').lower() == username.lower():
487 | target_account_id = acc.get('id')
488 | break
489 | else:
490 | # Check shared users
491 | for user in account.users():
492 | if user.username.lower() == username.lower() or (hasattr(user, 'title') and user.title.lower() == username.lower()):
493 | # Find this user's account ID in the account list
494 | for acc in account_list:
495 | if acc.get('name').lower() == user.username.lower():
496 | target_account_id = acc.get('id')
497 | break
498 | break
499 |
500 | if target_account_id is None:
501 | return json.dumps({"error": f"User '{username}' not found in the statistics data."})
502 |
503 | # Process the statistics data
504 | user_stats: Dict[int, Dict[str, Any]] = {}
505 |
506 | # Media type mapping
507 | media_type_map = {
508 | 1: "movie",
509 | 4: "episode",
510 | 10: "track",
511 | 100: "photo"
512 | }
513 |
514 | for stat in stats_list:
515 | account_id = stat.get('accountID')
516 |
517 | # Skip if we're filtering by user and this isn't the target user
518 | if target_account_id is not None and account_id != target_account_id:
519 | continue
520 |
521 | device_id = stat.get('deviceID')
522 | duration = stat.get('duration', 0) # Duration in seconds
523 | count = stat.get('count', 0) # Number of items played
524 | metadata_type = stat.get('metadataType', 0)
525 | media_type = media_type_map.get(metadata_type, f"unknown-{metadata_type}")
526 |
527 | # Initialize user stats if not already present
528 | if account_id not in user_stats:
529 | account_info = account_lookup.get(account_id, {'name': f"Unknown User {account_id}"})
530 | user_stats[account_id] = {
531 | 'user': account_info.get('name'),
532 | 'user_thumb': account_info.get('thumb'),
533 | 'total_duration': 0,
534 | 'total_plays': 0,
535 | 'media_types': {},
536 | 'devices': {}
537 | }
538 |
539 | # Update total duration and play count
540 | user_stats[account_id]['total_duration'] += duration
541 | user_stats[account_id]['total_plays'] += count
542 |
543 | # Update media type stats
544 | if media_type not in user_stats[account_id]['media_types']:
545 | user_stats[account_id]['media_types'][media_type] = {
546 | 'duration': 0,
547 | 'count': 0
548 | }
549 | user_stats[account_id]['media_types'][media_type]['duration'] += duration
550 | user_stats[account_id]['media_types'][media_type]['count'] += count
551 |
552 | # Update device stats
553 | if device_id is not None:
554 | device_info = device_lookup.get(device_id, {'name': f"Unknown Device {device_id}", 'platform': 'unknown'})
555 | device_name = device_info.get('name')
556 |
557 | if device_name not in user_stats[account_id]['devices']:
558 | user_stats[account_id]['devices'][device_name] = {
559 | 'platform': device_info.get('platform'),
560 | 'duration': 0,
561 | 'count': 0
562 | }
563 | user_stats[account_id]['devices'][device_name]['duration'] += duration
564 | user_stats[account_id]['devices'][device_name]['count'] += count
565 |
566 | # Format duration for better readability in each stat entry
567 | for account_id, stats in user_stats.items():
568 | # Format total duration
569 | hours, remainder = divmod(stats['total_duration'], 3600)
570 | minutes, seconds = divmod(remainder, 60)
571 | stats['formatted_duration'] = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
572 |
573 | # Format media type durations
574 | for media_type, media_stats in stats['media_types'].items():
575 | hours, remainder = divmod(media_stats['duration'], 3600)
576 | minutes, seconds = divmod(remainder, 60)
577 | media_stats['formatted_duration'] = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
578 |
579 | # Format device durations
580 | for device_name, device_stats in stats['devices'].items():
581 | hours, remainder = divmod(device_stats['duration'], 3600)
582 | minutes, seconds = divmod(remainder, 60)
583 | device_stats['formatted_duration'] = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
584 |
585 | # Sort users by total duration (descending)
586 | sorted_users = sorted(
587 | user_stats.values(),
588 | key=lambda x: x['total_duration'],
589 | reverse=True
590 | )
591 |
592 | # Format the final result
593 | result = {
594 | "time_period": time_period,
595 | "user_filter": username,
596 | "total_users": len(sorted_users),
597 | "stats_generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
598 | "users": sorted_users
599 | }
600 |
601 | return json.dumps(result)
602 | except Exception as e:
603 | return json.dumps({"error": f"Error getting user statistics: {str(e)}"})
```
--------------------------------------------------------------------------------
/modules/library.py:
--------------------------------------------------------------------------------
```python
1 | import json
2 | import requests
3 | import aiohttp
4 | import asyncio
5 | from plexapi.exceptions import NotFound # type: ignore
6 | from modules import mcp, connect_to_plex
7 | from urllib.parse import urljoin
8 | import time
9 |
10 | def get_plex_headers(plex):
11 | """Get standard Plex headers for HTTP requests"""
12 | return {
13 | 'X-Plex-Token': plex._token,
14 | 'Accept': 'application/json'
15 | }
16 |
17 | async def async_get_json(session, url, headers):
18 | """Helper function to make async HTTP requests"""
19 | async with session.get(url, headers=headers) as response:
20 | return await response.json()
21 |
22 | @mcp.tool()
23 | async def library_list() -> str:
24 | """List all available libraries on the Plex server."""
25 | try:
26 | plex = connect_to_plex()
27 | libraries = plex.library.sections()
28 |
29 | if not libraries:
30 | return json.dumps({"message": "No libraries found on your Plex server."})
31 |
32 | libraries_dict = {}
33 | for lib in libraries:
34 | libraries_dict[lib.title] = {
35 | "type": lib.type,
36 | "libraryId": lib.key,
37 | "totalSize": lib.totalSize,
38 | "uuid": lib.uuid,
39 | "locations": lib.locations,
40 | "updatedAt": lib.updatedAt.isoformat()
41 | }
42 |
43 | return json.dumps(libraries_dict)
44 | except Exception as e:
45 | return json.dumps({"error": f"Error listing libraries: {str(e)}"})
46 |
47 | @mcp.tool()
48 | async def library_get_stats(library_name: str) -> str:
49 | """Get statistics for a specific library.
50 |
51 | Args:
52 | library_name: Name of the library to get stats for
53 | """
54 | try:
55 | plex = connect_to_plex()
56 | base_url = plex._baseurl
57 | headers = get_plex_headers(plex)
58 |
59 | async with aiohttp.ClientSession() as session:
60 | # First get library sections
61 | sections_url = urljoin(base_url, 'library/sections')
62 | sections_data = await async_get_json(session, sections_url, headers)
63 |
64 | target_section = None
65 | for section in sections_data['MediaContainer']['Directory']:
66 | if section['title'].lower() == library_name.lower():
67 | target_section = section
68 | break
69 |
70 | if not target_section:
71 | return json.dumps({"error": f"Library '{library_name}' not found"})
72 |
73 | section_id = target_section['key']
74 | library_type = target_section['type']
75 |
76 | # Create base result
77 | result = {
78 | "name": target_section['title'],
79 | "type": library_type,
80 | "totalItems": target_section.get('totalSize', 0)
81 | }
82 |
83 | # Prepare URLs for concurrent requests
84 | all_items_url = urljoin(base_url, f'library/sections/{section_id}/all')
85 | unwatched_url = urljoin(base_url, f'library/sections/{section_id}/all?unwatched=1')
86 |
87 | # Make concurrent requests for all and unwatched items
88 | all_data, unwatched_data = await asyncio.gather(
89 | async_get_json(session, all_items_url, headers),
90 | async_get_json(session, unwatched_url, headers)
91 | )
92 | all_data = all_data['MediaContainer']
93 | unwatched_data = unwatched_data['MediaContainer']
94 |
95 | if library_type == 'movie':
96 | movie_stats = {
97 | "count": all_data.get('size', 0),
98 | "unwatched": unwatched_data.get('size', 0)
99 | }
100 |
101 | # Get genres, directors, studios stats
102 | genres = {}
103 | directors = {}
104 | studios = {}
105 | decades = {}
106 |
107 | for movie in all_data.get('Metadata', []):
108 | # Process genres
109 | for genre in movie.get('Genre', []):
110 | genre_name = genre['tag']
111 | genres[genre_name] = genres.get(genre_name, 0) + 1
112 |
113 | # Process directors
114 | for director in movie.get('Director', []):
115 | director_name = director['tag']
116 | directors[director_name] = directors.get(director_name, 0) + 1
117 |
118 | # Process studios
119 | studio = movie.get('studio')
120 | if studio:
121 | studios[studio] = studios.get(studio, 0) + 1
122 |
123 | # Process decades
124 | year = movie.get('year')
125 | if year:
126 | decade = (year // 10) * 10
127 | decades[decade] = decades.get(decade, 0) + 1
128 |
129 | # Add top items to results
130 | if genres:
131 | movie_stats["topGenres"] = dict(sorted(genres.items(), key=lambda x: x[1], reverse=True)[:5])
132 | if directors:
133 | movie_stats["topDirectors"] = dict(sorted(directors.items(), key=lambda x: x[1], reverse=True)[:5])
134 | if studios:
135 | movie_stats["topStudios"] = dict(sorted(studios.items(), key=lambda x: x[1], reverse=True)[:5])
136 | if decades:
137 | movie_stats["byDecade"] = dict(sorted(decades.items()))
138 |
139 | result["movieStats"] = movie_stats
140 |
141 | elif library_type == 'show':
142 | # Prepare URLs for concurrent requests
143 | seasons_url = urljoin(base_url, f'library/sections/{section_id}/all?type=3')
144 | episodes_url = urljoin(base_url, f'library/sections/{section_id}/all?type=4')
145 |
146 | # Make concurrent requests for seasons and episodes
147 | seasons_data, episodes_data = await asyncio.gather(
148 | async_get_json(session, seasons_url, headers),
149 | async_get_json(session, episodes_url, headers)
150 | )
151 | seasons_data = seasons_data['MediaContainer']
152 | episodes_data = episodes_data['MediaContainer']
153 |
154 | # Process show stats
155 | genres = {}
156 | studios = {}
157 | decades = {}
158 |
159 | for show in all_data.get('Metadata', []):
160 | # Process genres
161 | for genre in show.get('Genre', []):
162 | genre_name = genre['tag']
163 | genres[genre_name] = genres.get(genre_name, 0) + 1
164 |
165 | # Process studios
166 | studio = show.get('studio')
167 | if studio:
168 | studios[studio] = studios.get(studio, 0) + 1
169 |
170 | # Process decades
171 | year = show.get('year')
172 | if year:
173 | decade = (year // 10) * 10
174 | decades[decade] = decades.get(decade, 0) + 1
175 |
176 | show_stats = {
177 | "shows": all_data.get('size', 0),
178 | "seasons": seasons_data.get('size', 0),
179 | "episodes": episodes_data.get('size', 0),
180 | "unwatchedShows": unwatched_data.get('size', 0)
181 | }
182 |
183 | # Add top items to results
184 | if genres:
185 | show_stats["topGenres"] = dict(sorted(genres.items(), key=lambda x: x[1], reverse=True)[:5])
186 | if studios:
187 | show_stats["topStudios"] = dict(sorted(studios.items(), key=lambda x: x[1], reverse=True)[:5])
188 | if decades:
189 | show_stats["byDecade"] = dict(sorted(decades.items()))
190 |
191 | result["showStats"] = show_stats
192 |
193 | elif library_type == 'artist':
194 | # Initialize statistics
195 | artist_stats = {
196 | "count": all_data.get('size', 0),
197 | "totalTracks": 0,
198 | "totalAlbums": 0,
199 | "totalPlays": 0
200 | }
201 |
202 | # Track data for statistics
203 | all_genres = {}
204 | all_years = {}
205 | top_artists = {}
206 | top_albums = {}
207 | audio_formats = {}
208 |
209 | # Process artists one by one for accurate stats
210 | for artist in all_data.get('Metadata', []):
211 | artist_id = artist.get('ratingKey')
212 | artist_name = artist.get('title', '')
213 |
214 | if not artist_id:
215 | continue
216 |
217 | # Store artist views for top artists calculation
218 | artist_view_count = 0
219 | artist_albums = set()
220 | artist_track_count = 0
221 |
222 | # Get tracks directly for this artist
223 | artist_tracks_url = urljoin(base_url, f'library/sections/{section_id}/all?artist.id={artist_id}&type=10')
224 | artist_tracks_data = await async_get_json(session, artist_tracks_url, headers)
225 |
226 | if 'MediaContainer' in artist_tracks_data and 'Metadata' in artist_tracks_data['MediaContainer']:
227 | for track in artist_tracks_data['MediaContainer']['Metadata']:
228 | # Count total tracks
229 | artist_track_count += 1
230 |
231 | # Count track views for this artist
232 | track_views = track.get('viewCount', 0)
233 | artist_view_count += track_views
234 | artist_stats["totalPlays"] += track_views
235 |
236 | # Add album to set
237 | album_title = track.get('parentTitle')
238 | if album_title:
239 | artist_albums.add(album_title)
240 |
241 | # Track album plays for top albums
242 | album_key = f"{artist_name} - {album_title}"
243 | if album_key not in top_albums:
244 | top_albums[album_key] = 0
245 | top_albums[album_key] += track_views
246 |
247 | # Process genres if available
248 | if 'Genre' in track:
249 | for genre in track.get('Genre', []):
250 | genre_name = genre['tag']
251 | all_genres[genre_name] = all_genres.get(genre_name, 0) + 1
252 |
253 | # Process years instead of decades
254 | year = track.get('parentYear') or track.get('year')
255 | if year:
256 | all_years[year] = all_years.get(year, 0) + 1
257 |
258 | # Track audio formats
259 | if 'Media' in track and track['Media'] and 'audioCodec' in track['Media'][0]:
260 | audio_codec = track['Media'][0]['audioCodec']
261 | audio_formats[audio_codec] = audio_formats.get(audio_codec, 0) + 1
262 |
263 | # Update top artists
264 | if artist_track_count > 0:
265 | top_artists[artist_name] = artist_view_count
266 |
267 | # Update totals
268 | artist_stats["totalTracks"] += artist_track_count
269 | artist_stats["totalAlbums"] += len(artist_albums)
270 |
271 | # Add top items to results
272 | if all_genres:
273 | artist_stats["topGenres"] = dict(sorted(all_genres.items(), key=lambda x: x[1], reverse=True)[:10])
274 | if top_artists:
275 | artist_stats["topArtists"] = dict(sorted(top_artists.items(), key=lambda x: x[1], reverse=True)[:10])
276 | if top_albums:
277 | artist_stats["topAlbums"] = dict(sorted(top_albums.items(), key=lambda x: x[1], reverse=True)[:10])
278 | if all_years:
279 | artist_stats["byYear"] = dict(sorted(all_years.items()))
280 | if audio_formats:
281 | artist_stats["audioFormats"] = audio_formats
282 |
283 | result["musicStats"] = artist_stats
284 |
285 | return json.dumps(result)
286 |
287 | except Exception as e:
288 | return json.dumps({"error": f"Error getting library stats: {str(e)}"})
289 |
290 | @mcp.tool()
291 | async def library_refresh(library_name: str = None) -> str:
292 | """Refresh a specific library or all libraries.
293 |
294 | Args:
295 | library_name: Optional name of the library to refresh (refreshes all if None)
296 | """
297 | try:
298 | plex = connect_to_plex()
299 |
300 | if library_name:
301 | # Refresh a specific library
302 | section = None
303 | all_sections = plex.library.sections()
304 |
305 | # Find the section with matching name (case-insensitive)
306 | for s in all_sections:
307 | if s.title.lower() == library_name.lower():
308 | section = s
309 | break
310 |
311 | if not section:
312 | return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
313 |
314 | # Refresh the library
315 | section.refresh()
316 | return json.dumps({"success": True, "message": f"Refreshing library '{section.title}'. This may take some time."})
317 | else:
318 | # Refresh all libraries
319 | plex.library.refresh()
320 | return json.dumps({"success": True, "message": "Refreshing all libraries. This may take some time."})
321 | except Exception as e:
322 | return json.dumps({"error": f"Error refreshing library: {str(e)}"})
323 |
324 | @mcp.tool()
325 | async def library_scan(library_name: str, path: str = None) -> str:
326 | """Scan a specific library or part of a library.
327 |
328 | Args:
329 | library_name: Name of the library to scan
330 | path: Optional specific path to scan within the library
331 | """
332 | try:
333 | plex = connect_to_plex()
334 |
335 | # Find the specified library
336 | section = None
337 | all_sections = plex.library.sections()
338 |
339 | # Find the section with matching name (case-insensitive)
340 | for s in all_sections:
341 | if s.title.lower() == library_name.lower():
342 | section = s
343 | break
344 |
345 | if not section:
346 | return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
347 |
348 | # Scan the library
349 | if path:
350 | try:
351 | section.update(path=path)
352 | return json.dumps({"success": True, "message": f"Scanning path '{path}' in library '{section.title}'. This may take some time."})
353 | except NotFound:
354 | return json.dumps({"error": f"Path '{path}' not found in library '{section.title}'."})
355 | else:
356 | section.update()
357 | return json.dumps({"success": True, "message": f"Scanning library '{section.title}'. This may take some time."})
358 | except Exception as e:
359 | return json.dumps({"error": f"Error scanning library: {str(e)}"})
360 |
361 | @mcp.tool()
362 | async def library_get_details(library_name: str) -> str:
363 | """Get detailed information about a specific library, including folder paths and settings.
364 |
365 | Args:
366 | library_name: Name of the library to get details for
367 | """
368 | try:
369 | plex = connect_to_plex()
370 |
371 | # Get all library sections
372 | all_sections = plex.library.sections()
373 | target_section = None
374 |
375 | # Find the section with the matching name (case-insensitive)
376 | for section in all_sections:
377 | if section.title.lower() == library_name.lower():
378 | target_section = section
379 | break
380 |
381 | if not target_section:
382 | return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
383 |
384 | # Create the result dictionary
385 | result = {
386 | "name": target_section.title,
387 | "type": target_section.type,
388 | "uuid": target_section.uuid,
389 | "totalItems": target_section.totalSize,
390 | "locations": target_section.locations,
391 | "agent": target_section.agent,
392 | "scanner": target_section.scanner,
393 | "language": target_section.language
394 | }
395 |
396 | # Get additional attributes using _data
397 | data = target_section._data
398 |
399 | # Add scanner settings if available
400 | if 'scannerSettings' in data:
401 | scanner_settings = {}
402 | for setting in data['scannerSettings']:
403 | if 'value' in setting:
404 | scanner_settings[setting.get('key', 'unknown')] = setting['value']
405 | if scanner_settings:
406 | result["scannerSettings"] = scanner_settings
407 |
408 | # Add agent settings if available
409 | if 'agentSettings' in data:
410 | agent_settings = {}
411 | for setting in data['agentSettings']:
412 | if 'value' in setting:
413 | agent_settings[setting.get('key', 'unknown')] = setting['value']
414 | if agent_settings:
415 | result["agentSettings"] = agent_settings
416 |
417 | # Add advanced settings if available
418 | if 'advancedSettings' in data:
419 | advanced_settings = {}
420 | for setting in data['advancedSettings']:
421 | if 'value' in setting:
422 | advanced_settings[setting.get('key', 'unknown')] = setting['value']
423 | if advanced_settings:
424 | result["advancedSettings"] = advanced_settings
425 |
426 | return json.dumps(result)
427 | except Exception as e:
428 | return json.dumps({"error": f"Error getting library details: {str(e)}"})
429 |
430 | @mcp.tool()
431 | async def library_get_recently_added(count: int = 50, library_name: str = None) -> str:
432 | """Get recently added media across all libraries or in a specific library.
433 |
434 | Args:
435 | count: Number of items to return (default: 50)
436 | library_name: Optional library name to limit results to
437 | """
438 | try:
439 | plex = connect_to_plex()
440 |
441 | # Check if we need to filter by library
442 | if library_name:
443 | # Find the specified library
444 | section = None
445 | all_sections = plex.library.sections()
446 |
447 | # Find the section with matching name (case-insensitive)
448 | for s in all_sections:
449 | if s.title.lower() == library_name.lower():
450 | section = s
451 | break
452 |
453 | if not section:
454 | return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
455 |
456 | # Get recently added from this library
457 | recent = section.recentlyAdded(maxresults=count)
458 | else:
459 | # Get recently added across all libraries
460 | recent = plex.library.recentlyAdded()
461 | # Sort by date added (newest first) and limit to count
462 | if recent:
463 | try:
464 | recent = sorted(recent, key=lambda x: getattr(x, 'addedAt', None), reverse=True)[:count]
465 | except Exception as sort_error:
466 | # If sorting fails, just take the first 'count' items
467 | recent = recent[:count]
468 |
469 | if not recent:
470 | return json.dumps({"message": "No recently added items found."})
471 |
472 | # Prepare the result
473 | result = {
474 | "count": len(recent),
475 | "requestedCount": count,
476 | "library": library_name if library_name else "All Libraries",
477 | "items": {}
478 | }
479 |
480 | # Group results by type
481 | for item in recent:
482 | item_type = getattr(item, 'type', 'unknown')
483 | if item_type not in result["items"]:
484 | result["items"][item_type] = []
485 |
486 | try:
487 | added_at = str(getattr(item, 'addedAt', 'Unknown date'))
488 |
489 | if item_type == 'movie':
490 | result["items"][item_type].append({
491 | "title": item.title,
492 | "year": getattr(item, 'year', ''),
493 | "addedAt": added_at
494 | })
495 |
496 | elif item_type == 'show':
497 | result["items"][item_type].append({
498 | "title": item.title,
499 | "year": getattr(item, 'year', ''),
500 | "addedAt": added_at
501 | })
502 |
503 | elif item_type == 'season':
504 | result["items"][item_type].append({
505 | "showTitle": getattr(item, 'parentTitle', 'Unknown Show'),
506 | "seasonNumber": getattr(item, 'index', '?'),
507 | "addedAt": added_at
508 | })
509 |
510 | elif item_type == 'episode':
511 | result["items"][item_type].append({
512 | "showTitle": getattr(item, 'grandparentTitle', 'Unknown Show'),
513 | "seasonNumber": getattr(item, 'parentIndex', '?'),
514 | "episodeNumber": getattr(item, 'index', '?'),
515 | "title": item.title,
516 | "addedAt": added_at
517 | })
518 |
519 | elif item_type == 'artist':
520 | result["items"][item_type].append({
521 | "title": item.title,
522 | "addedAt": added_at
523 | })
524 |
525 | elif item_type == 'album':
526 | result["items"][item_type].append({
527 | "artist": getattr(item, 'parentTitle', 'Unknown Artist'),
528 | "title": item.title,
529 | "addedAt": added_at
530 | })
531 |
532 | elif item_type == 'track':
533 | result["items"][item_type].append({
534 | "artist": getattr(item, 'grandparentTitle', 'Unknown Artist'),
535 | "album": getattr(item, 'parentTitle', 'Unknown Album'),
536 | "title": item.title,
537 | "addedAt": added_at
538 | })
539 |
540 | else:
541 | # Generic handler for other types
542 | result["items"][item_type].append({
543 | "title": getattr(item, 'title', 'Unknown'),
544 | "addedAt": added_at
545 | })
546 |
547 | except Exception as format_error:
548 | # If there's an error formatting a particular item, just output basic info
549 | result["items"][item_type].append({
550 | "title": getattr(item, 'title', 'Unknown'),
551 | "error": str(format_error)
552 | })
553 |
554 | return json.dumps(result)
555 | except Exception as e:
556 | return json.dumps({"error": f"Error getting recently added items: {str(e)}"})
557 |
558 | @mcp.tool()
559 | async def library_get_contents(library_name: str) -> str:
560 | """Get the full contents of a specific library.
561 |
562 | Args:
563 | library_name: Name of the library to get contents from
564 |
565 | Returns:
566 | String listing all items in the library
567 | """
568 | try:
569 | plex = connect_to_plex()
570 | base_url = plex._baseurl
571 | headers = get_plex_headers(plex)
572 |
573 | async with aiohttp.ClientSession() as session:
574 | # First get library sections
575 | sections_url = urljoin(base_url, 'library/sections')
576 | sections_data = await async_get_json(session, sections_url, headers)
577 |
578 | target_section = None
579 | for section in sections_data['MediaContainer']['Directory']:
580 | if section['title'].lower() == library_name.lower():
581 | target_section = section
582 | break
583 |
584 | if not target_section:
585 | return json.dumps({"error": f"Library '{library_name}' not found"})
586 |
587 | section_id = target_section['key']
588 | library_type = target_section['type']
589 |
590 | # Get all items
591 | all_items_url = urljoin(base_url, f'library/sections/{section_id}/all')
592 | all_data = await async_get_json(session, all_items_url, headers)
593 | all_data = all_data['MediaContainer']
594 |
595 | # Prepare the result
596 | result = {
597 | "name": target_section['title'],
598 | "type": library_type,
599 | "totalItems": all_data.get('size', 0),
600 | "items": []
601 | }
602 |
603 | # Process items based on library type
604 | if library_type == 'movie':
605 | for item in all_data.get('Metadata', []):
606 | year = item.get('year', 'Unknown')
607 | duration = item.get('duration', 0)
608 | # Convert duration from milliseconds to hours and minutes
609 | hours, remainder = divmod(duration // 1000, 3600)
610 | minutes, seconds = divmod(remainder, 60)
611 |
612 | # Get media info
613 | media_info = {}
614 | if 'Media' in item:
615 | media = item['Media'][0] if item['Media'] else {}
616 | resolution = media.get('videoResolution', '')
617 | codec = media.get('videoCodec', '')
618 | if resolution and codec:
619 | media_info = {
620 | "resolution": resolution,
621 | "codec": codec
622 | }
623 |
624 | # Check if watched
625 | watched = item.get('viewCount', 0) > 0
626 |
627 | result["items"].append({
628 | "title": item.get('title', ''),
629 | "year": year,
630 | "duration": {
631 | "hours": hours,
632 | "minutes": minutes
633 | },
634 | "mediaInfo": media_info,
635 | "watched": watched
636 | })
637 |
638 | elif library_type == 'show':
639 | # Get all shows metadata in parallel
640 | show_urls = [
641 | (item["ratingKey"], urljoin(base_url, f'library/metadata/{item["ratingKey"]}'))
642 | for item in all_data.get('Metadata', [])
643 | ]
644 | show_responses = await asyncio.gather(
645 | *[async_get_json(session, url, headers) for _, url in show_urls]
646 | )
647 |
648 | for item, show_data in zip(all_data.get('Metadata', []), show_responses):
649 | show_data = show_data['MediaContainer']['Metadata'][0]
650 |
651 | year = item.get('year', 'Unknown')
652 | season_count = show_data.get('childCount', 0)
653 | episode_count = show_data.get('leafCount', 0)
654 | watched = episode_count > 0 and show_data.get('viewedLeafCount', 0) == episode_count
655 |
656 | result["items"].append({
657 | "title": item.get('title', ''),
658 | "year": year,
659 | "seasonCount": season_count,
660 | "episodeCount": episode_count,
661 | "watched": watched
662 | })
663 |
664 | elif library_type == 'artist':
665 | # Process artists one by one for more accurate track/album counting
666 | artists_info = {}
667 |
668 | for artist in all_data.get('Metadata', []):
669 | artist_id = artist.get('ratingKey')
670 | artist_name = artist.get('title', '')
671 |
672 | if not artist_id:
673 | continue
674 |
675 | # Store the original artist viewCount and skipCount as fallback
676 | orig_view_count = artist.get('viewCount', 0)
677 | orig_skip_count = artist.get('skipCount', 0)
678 |
679 | # Get tracks directly for this artist
680 | artist_tracks_url = urljoin(base_url, f'library/sections/{section_id}/all?artist.id={artist_id}&type=10')
681 | artist_tracks_data = await async_get_json(session, artist_tracks_url, headers)
682 |
683 | # Initialize artist data
684 | if artist_name not in artists_info:
685 | artists_info[artist_name] = {
686 | "title": artist_name,
687 | "albums": set(),
688 | "trackCount": 0,
689 | "viewCount": 0,
690 | "skipCount": 0
691 | }
692 |
693 | # Count tracks and albums from the track-level data
694 | track_view_count = 0
695 | track_skip_count = 0
696 | if 'MediaContainer' in artist_tracks_data and 'Metadata' in artist_tracks_data['MediaContainer']:
697 | for track in artist_tracks_data['MediaContainer']['Metadata']:
698 | # Count each track
699 | artists_info[artist_name]["trackCount"] += 1
700 |
701 | # Add album to set (to get unique album count)
702 | if 'parentTitle' in track and track['parentTitle']:
703 | artists_info[artist_name]["albums"].add(track['parentTitle'])
704 |
705 | # Count views and skips
706 | track_view_count += track.get('viewCount', 0)
707 | track_skip_count += track.get('skipCount', 0)
708 |
709 | # Use the sum of track counts if they're non-zero, otherwise fall back to artist level counts
710 | artists_info[artist_name]["viewCount"] = track_view_count if track_view_count > 0 else orig_view_count
711 | artists_info[artist_name]["skipCount"] = track_skip_count if track_skip_count > 0 else orig_skip_count
712 |
713 | # Convert album sets to counts and add to results
714 | for artist_name, info in artists_info.items():
715 | result["items"].append({
716 | "title": info["title"],
717 | "albumCount": len(info["albums"]),
718 | "trackCount": info["trackCount"],
719 | "viewCount": info["viewCount"],
720 | "skipCount": info["skipCount"]
721 | })
722 |
723 | else:
724 | # Generic handler for other types
725 | for item in all_data.get('Metadata', []):
726 | result["items"].append({
727 | "title": item.get('title', '')
728 | })
729 |
730 | return json.dumps(result)
731 |
732 | except Exception as e:
733 | return json.dumps({"error": f"Error getting library contents: {str(e)}"})
734 |
```
--------------------------------------------------------------------------------
/modules/collection.py:
--------------------------------------------------------------------------------
```python
1 | from plexapi.collection import Collection # type: ignore
2 | from typing import List, Dict, Any
3 | from modules import mcp, connect_to_plex
4 | import os
5 | from plexapi.exceptions import NotFound, BadRequest # type: ignore
6 | import json
7 |
8 | @mcp.tool()
9 | async def collection_list(library_name: str = None) -> str:
10 | """List all collections on the Plex server or in a specific library.
11 |
12 | Args:
13 | library_name: Optional name of the library to list collections from
14 | """
15 | try:
16 | plex = connect_to_plex()
17 | collections_data = []
18 |
19 | # If library_name is provided, only show collections from that library
20 | if library_name:
21 | try:
22 | library = plex.library.section(library_name)
23 | collections = library.collections()
24 | for collection in collections:
25 | collection_info = {
26 | "title": collection.title,
27 | "summary": collection.summary,
28 | "is_smart": collection.smart,
29 | "ID": collection.ratingKey,
30 | "items": collection.childCount
31 | }
32 | collections_data.append(collection_info)
33 |
34 | return json.dumps(collections_data, indent=4)
35 | except NotFound:
36 | return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
37 |
38 | # No library specified, get collections from all movie and show libraries
39 | movie_libraries = []
40 | show_libraries = []
41 |
42 | for section in plex.library.sections():
43 | if section.type == 'movie':
44 | movie_libraries.append(section)
45 | elif section.type == 'show':
46 | show_libraries.append(section)
47 |
48 | # Group collections by library
49 | libraries_collections = {}
50 |
51 | # Get movie collections
52 | for library in movie_libraries:
53 | lib_collections = []
54 |
55 | for collection in library.collections():
56 | collection_info = {
57 | "title": collection.title,
58 | "summary": collection.summary,
59 | "is_smart": collection.smart,
60 | "ID": collection.ratingKey,
61 | "items": collection.childCount
62 | }
63 | lib_collections.append(collection_info)
64 |
65 | libraries_collections[library.title] = {
66 | "type": "movie",
67 | "collections_count": len(lib_collections),
68 | "collections": lib_collections
69 | }
70 |
71 | # Get TV show collections
72 | for library in show_libraries:
73 | lib_collections = []
74 |
75 | for collection in library.collections():
76 | collection_info = {
77 | "title": collection.title,
78 | "summary": collection.summary,
79 | "is_smart": collection.smart,
80 | "ID": collection.ratingKey,
81 | "items": collection.childCount
82 | }
83 | lib_collections.append(collection_info)
84 |
85 | libraries_collections[library.title] = {
86 | "type": "show",
87 | "collections_count": len(lib_collections),
88 | "collections": lib_collections
89 | }
90 |
91 | return json.dumps(libraries_collections, indent=4)
92 | except Exception as e:
93 | return json.dumps({"error": str(e)}, indent=4)
94 |
95 | @mcp.tool()
96 | async def collection_create(collection_title: str, library_name: str, item_titles: List[str] = None, item_ids: List[int] = None) -> str:
97 | """Create a new collection with specified items.
98 |
99 | Args:
100 | collection_title: Title for the new collection
101 | library_name: Name of the library to create the collection in
102 | item_titles: List of media titles to include in the collection (optional if item_ids is provided)
103 | item_ids: List of media IDs to include in the collection (optional if item_titles is provided)
104 | """
105 | try:
106 | plex = connect_to_plex()
107 |
108 | # Validate that at least one item source is provided
109 | if (not item_titles or len(item_titles) == 0) and (not item_ids or len(item_ids) == 0):
110 | return json.dumps({"error": "Either item_titles or item_ids must be provided"}, indent=4)
111 |
112 | # Find the library
113 | try:
114 | library = plex.library.section(library_name)
115 | except NotFound:
116 | return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
117 |
118 | # Check if collection already exists
119 | try:
120 | existing_collection = next((c for c in library.collections() if c.title.lower() == collection_title.lower()), None)
121 | if existing_collection:
122 | return json.dumps({"error": f"Collection '{collection_title}' already exists in library '{library_name}'"}, indent=4)
123 | except Exception:
124 | pass # If we can't check existing collections, proceed anyway
125 |
126 | # Find items to add to the collection
127 | items = []
128 | not_found = []
129 |
130 | # If we have item IDs, try to add by ID first
131 | if item_ids and len(item_ids) > 0:
132 | for item_id in item_ids:
133 | try:
134 | # Try to fetch the item by ID
135 | item = plex.fetchItem(item_id)
136 | if item:
137 | items.append(item)
138 | else:
139 | not_found.append(str(item_id))
140 | except Exception as e:
141 | not_found.append(str(item_id))
142 |
143 | # If we have item titles, search for them
144 | if item_titles and len(item_titles) > 0:
145 | for title in item_titles:
146 | # Search for the media item
147 | search_results = library.search(title=title)
148 |
149 | if search_results:
150 | # Check for exact title match (case insensitive)
151 | exact_matches = [item for item in search_results if item.title.lower() == title.lower()]
152 |
153 | if exact_matches:
154 | items.append(exact_matches[0])
155 | else:
156 | # No exact match, collect possible matches
157 | possible_matches = []
158 | for item in search_results:
159 | possible_matches.append({
160 | "title": item.title,
161 | "id": item.ratingKey,
162 | "type": item.type,
163 | "year": item.year if hasattr(item, 'year') and item.year else None
164 | })
165 |
166 | not_found.append({
167 | "title": title,
168 | "possible_matches": possible_matches
169 | })
170 | else:
171 | not_found.append(title)
172 |
173 | # If we have possible matches but no items to add, return the possible matches
174 | if not items and any(isinstance(item, dict) for item in not_found):
175 | possible_matches_response = []
176 | for item in not_found:
177 | if isinstance(item, dict) and "possible_matches" in item:
178 | for match in item["possible_matches"]:
179 | if match not in possible_matches_response:
180 | possible_matches_response.append(match)
181 |
182 | return json.dumps({"Multiple Possible Matches Use ID":possible_matches_response}, indent=4)
183 |
184 | if not items:
185 | return json.dumps({"error": "No matching media items found for the collection"}, indent=4)
186 |
187 | # Create the collection
188 | collection = library.createCollection(title=collection_title, items=items)
189 |
190 | return json.dumps({
191 | "created": True,
192 | "title": collection.title,
193 | "id": collection.ratingKey,
194 | "library": library_name,
195 | "items_added": len(items),
196 | "items_not_found": [item for item in not_found if not isinstance(item, dict)]
197 | }, indent=4)
198 | except Exception as e:
199 | return json.dumps({"error": str(e)}, indent=4)
200 |
201 | @mcp.tool()
202 | async def collection_add_to(collection_title: str = None, collection_id: int = None, library_name: str = None, item_titles: List[str] = None, item_ids: List[int] = None) -> str:
203 | """Add items to an existing collection.
204 |
205 | Args:
206 | collection_title: Title of the collection to add to (optional if collection_id is provided)
207 | collection_id: ID of the collection to add to (optional if collection_title is provided)
208 | library_name: Name of the library containing the collection (required if using collection_title)
209 | item_titles: List of media titles to add to the collection (optional if item_ids is provided)
210 | item_ids: List of media IDs to add to the collection (optional if item_titles is provided)
211 | """
212 | try:
213 | plex = connect_to_plex()
214 |
215 | # Validate that at least one identifier is provided
216 | if not collection_id and not collection_title:
217 | return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
218 |
219 | # Validate that at least one item source is provided
220 | if (not item_titles or len(item_titles) == 0) and (not item_ids or len(item_ids) == 0):
221 | return json.dumps({"error": "Either item_titles or item_ids must be provided"}, indent=4)
222 |
223 | # Find the collection
224 | collection = None
225 | library = None
226 |
227 | # If collection_id is provided, use it to directly fetch the collection
228 | if collection_id:
229 | try:
230 | # Try fetching by ratingKey first
231 | try:
232 | collection = plex.fetchItem(collection_id)
233 | except:
234 | # If that fails, try finding by key in all libraries
235 | collection = None
236 | for section in plex.library.sections():
237 | if section.type in ['movie', 'show']:
238 | try:
239 | for c in section.collections():
240 | if c.ratingKey == collection_id:
241 | collection = c
242 | library = section
243 | break
244 | if collection:
245 | break
246 | except:
247 | continue
248 |
249 | if not collection:
250 | return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
251 | except Exception as e:
252 | return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
253 | else:
254 | # If we're searching by title
255 | if not library_name:
256 | return json.dumps({"error": "Library name is required when adding items by collection title"}, indent=4)
257 |
258 | # Find the library
259 | try:
260 | library = plex.library.section(library_name)
261 | except NotFound:
262 | return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
263 |
264 | # Find matching collections
265 | matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
266 |
267 | if not matching_collections:
268 | return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
269 |
270 | # If multiple matching collections, return list of matches with IDs
271 | if len(matching_collections) > 1:
272 | matches = []
273 | for c in matching_collections:
274 | matches.append({
275 | "title": c.title,
276 | "id": c.ratingKey,
277 | "library": library_name,
278 | "item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
279 | })
280 |
281 | # Return as a direct array like playlist_list
282 | return json.dumps(matches, indent=4)
283 |
284 | collection = matching_collections[0]
285 |
286 | # Find items to add
287 | items_to_add = []
288 | not_found = []
289 | already_in_collection = []
290 | current_items = collection.items()
291 | current_item_ids = [item.ratingKey for item in current_items]
292 |
293 | # If we have item IDs, try to add by ID first
294 | if item_ids and len(item_ids) > 0:
295 | for item_id in item_ids:
296 | try:
297 | # Try to fetch the item by ID
298 | item = plex.fetchItem(item_id)
299 | if item:
300 | if item.ratingKey in current_item_ids:
301 | already_in_collection.append(str(item_id))
302 | else:
303 | items_to_add.append(item)
304 | else:
305 | not_found.append(str(item_id))
306 | except Exception as e:
307 | not_found.append(str(item_id))
308 |
309 | # If we have item titles, search for them with exact matching
310 | if item_titles and len(item_titles) > 0:
311 | if not library:
312 | # This could happen if we found the collection by ID
313 | # Try to determine which library the collection belongs to
314 | for section in plex.library.sections():
315 | if section.type == 'movie' or section.type == 'show':
316 | try:
317 | for c in section.collections():
318 | if c.ratingKey == collection.ratingKey:
319 | library = section
320 | break
321 | if library:
322 | break
323 | except:
324 | continue
325 |
326 | if not library:
327 | return json.dumps({"error": "Could not determine which library to search in"}, indent=4)
328 |
329 | for title in item_titles:
330 | # Search for the media item with exact matching
331 | search_results = library.search(title=title)
332 |
333 | if search_results:
334 | # Check for exact title match (case insensitive)
335 | exact_matches = [item for item in search_results if item.title.lower() == title.lower()]
336 |
337 | if exact_matches:
338 | item = exact_matches[0]
339 | if item.ratingKey in current_item_ids:
340 | already_in_collection.append(title)
341 | else:
342 | items_to_add.append(item)
343 | else:
344 | # No exact match, collect possible matches
345 | possible_matches = []
346 | for item in search_results:
347 | possible_matches.append({
348 | "title": item.title,
349 | "id": item.ratingKey,
350 | "type": item.type,
351 | "year": item.year if hasattr(item, 'year') and item.year else None
352 | })
353 |
354 | not_found.append({
355 | "title": title,
356 | "possible_matches": possible_matches
357 | })
358 | else:
359 | not_found.append(title)
360 |
361 | # If we have possible matches but no items to add, return the possible matches
362 | if not items_to_add and any(isinstance(item, dict) for item in not_found):
363 | possible_matches_response = []
364 | for item in not_found:
365 | if isinstance(item, dict) and "possible_matches" in item:
366 | for match in item["possible_matches"]:
367 | if match not in possible_matches_response:
368 | possible_matches_response.append(match)
369 |
370 | return json.dumps(possible_matches_response, indent=4)
371 |
372 | # If no items to add and no possible matches
373 | if not items_to_add and not already_in_collection:
374 | return json.dumps({"error": "No matching media items found to add to the collection"}, indent=4)
375 |
376 | # Add items to the collection
377 | if items_to_add:
378 | collection.addItems(items_to_add)
379 |
380 | return json.dumps({
381 | "added": True,
382 | "title": collection.title,
383 | "items_added": [item.title for item in items_to_add],
384 | "items_already_in_collection": already_in_collection,
385 | "items_not_found": [item for item in not_found if not isinstance(item, dict)],
386 | "total_items": len(collection.items())
387 | }, indent=4)
388 | except Exception as e:
389 | return json.dumps({"error": str(e)}, indent=4)
390 |
391 | @mcp.tool()
392 | async def collection_remove_from(collection_title: str = None, collection_id: int = None, library_name: str = None, item_titles: List[str] = None) -> str:
393 | """Remove items from a collection.
394 |
395 | Args:
396 | collection_title: Title of the collection to remove from (optional if collection_id is provided)
397 | collection_id: ID of the collection to remove from (optional if collection_title is provided)
398 | library_name: Name of the library containing the collection (required if using collection_title)
399 | item_titles: List of media titles to remove from the collection
400 | """
401 | try:
402 | plex = connect_to_plex()
403 |
404 | # Validate that at least one identifier is provided
405 | if not collection_id and not collection_title:
406 | return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
407 |
408 | if not item_titles or len(item_titles) == 0:
409 | return json.dumps({"error": "At least one item title must be provided to remove"}, indent=4)
410 |
411 | # Find the collection
412 | collection = None
413 |
414 | # If collection_id is provided, use it to directly fetch the collection
415 | if collection_id:
416 | try:
417 | # Try fetching by ratingKey first
418 | try:
419 | collection = plex.fetchItem(collection_id)
420 | except:
421 | # If that fails, try finding by key in all libraries
422 | collection = None
423 | for section in plex.library.sections():
424 | if section.type in ['movie', 'show']:
425 | try:
426 | for c in section.collections():
427 | if c.ratingKey == collection_id:
428 | collection = c
429 | break
430 | if collection:
431 | break
432 | except:
433 | continue
434 |
435 | if not collection:
436 | return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
437 | except Exception as e:
438 | return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
439 | else:
440 | # If we get here, we're searching by title
441 | if not library_name:
442 | return json.dumps({"error": "Library name is required when removing items by collection title"}, indent=4)
443 |
444 | # Find the library
445 | try:
446 | library = plex.library.section(library_name)
447 | except NotFound:
448 | return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
449 |
450 | # Find matching collections
451 | matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
452 |
453 | if not matching_collections:
454 | return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
455 |
456 | # If multiple matching collections, return list of matches with IDs
457 | if len(matching_collections) > 1:
458 | matches = []
459 | for c in matching_collections:
460 | matches.append({
461 | "title": c.title,
462 | "id": c.ratingKey,
463 | "library": library_name,
464 | "item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
465 | })
466 |
467 | # Return as a direct array like playlist_list
468 | return json.dumps(matches, indent=4)
469 |
470 | collection = matching_collections[0]
471 |
472 | # Get current items in the collection
473 | collection_items = collection.items()
474 |
475 | # Find items to remove
476 | items_to_remove = []
477 | not_found = []
478 |
479 | for title in item_titles:
480 | found = False
481 | for item in collection_items:
482 | if item.title.lower() == title.lower():
483 | items_to_remove.append(item)
484 | found = True
485 | break
486 | if not found:
487 | not_found.append(title)
488 |
489 | if not items_to_remove:
490 | # No items found to remove, return the current collection contents
491 | current_items = []
492 | for item in collection_items:
493 | current_items.append({
494 | "title": item.title,
495 | "type": item.type,
496 | "id": item.ratingKey
497 | })
498 |
499 | return json.dumps({
500 | "error": "No matching items found in the collection to remove",
501 | "collection_title": collection.title,
502 | "collection_id": collection.ratingKey,
503 | "current_items": current_items
504 | }, indent=4)
505 |
506 | # Remove items from the collection
507 | collection.removeItems(items_to_remove)
508 |
509 | return json.dumps({
510 | "removed": True,
511 | "title": collection.title,
512 | "items_removed": [item.title for item in items_to_remove],
513 | "items_not_found": not_found,
514 | "remaining_items": len(collection.items())
515 | }, indent=4)
516 | except Exception as e:
517 | return json.dumps({"error": str(e)}, indent=4)
518 |
519 | @mcp.tool()
520 | async def collection_delete(collection_title: str = None, collection_id: int = None, library_name: str = None) -> str:
521 | """Delete a collection.
522 |
523 | Args:
524 | collection_title: Title of the collection to delete (optional if collection_id is provided)
525 | collection_id: ID of the collection to delete (optional if collection_title is provided)
526 | library_name: Name of the library containing the collection (required if using collection_title)
527 | """
528 | try:
529 | plex = connect_to_plex()
530 |
531 | # Validate that at least one identifier is provided
532 | if not collection_id and not collection_title:
533 | return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
534 |
535 | # If collection_id is provided, use it to directly fetch the collection
536 | if collection_id:
537 | try:
538 | # Try fetching by ratingKey first
539 | try:
540 | collection = plex.fetchItem(collection_id)
541 | except:
542 | # If that fails, try finding by key in all libraries
543 | collection = None
544 | for section in plex.library.sections():
545 | if section.type in ['movie', 'show']:
546 | try:
547 | for c in section.collections():
548 | if c.ratingKey == collection_id:
549 | collection = c
550 | break
551 | if collection:
552 | break
553 | except:
554 | continue
555 |
556 | if not collection:
557 | return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
558 |
559 | # Get the collection title to return in the message
560 | collection_title_to_return = collection.title
561 |
562 | # Delete the collection
563 | collection.delete()
564 |
565 | # Return a simple object with the result
566 | return json.dumps({
567 | "deleted": True,
568 | "title": collection_title_to_return
569 | }, indent=4)
570 | except Exception as e:
571 | return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
572 |
573 | # If we get here, we're searching by title
574 | if not library_name:
575 | return json.dumps({"error": "Library name is required when deleting by collection title"}, indent=4)
576 |
577 | # Find the library
578 | try:
579 | library = plex.library.section(library_name)
580 | except NotFound:
581 | return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
582 |
583 | # Find matching collections
584 | matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
585 |
586 | if not matching_collections:
587 | return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
588 |
589 | # If multiple matching collections, return list of matches with IDs
590 | if len(matching_collections) > 1:
591 | matches = []
592 | for c in matching_collections:
593 | matches.append({
594 | "title": c.title,
595 | "id": c.ratingKey,
596 | "library": library_name,
597 | "item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
598 | })
599 |
600 | # Return as a direct array like playlist_list
601 | return json.dumps(matches, indent=4)
602 |
603 | collection = matching_collections[0]
604 | collection_title_to_return = collection.title
605 |
606 | # Delete the collection
607 | collection.delete()
608 |
609 | # Return a simple object with the result
610 | return json.dumps({
611 | "deleted": True,
612 | "title": collection_title_to_return
613 | }, indent=4)
614 | except Exception as e:
615 | return json.dumps({"error": str(e)}, indent=4)
616 |
617 | @mcp.tool()
618 | async def collection_edit(collection_title: str = None, collection_id: int = None, library_name: str = None,
619 | new_title: str = None, new_sort_title: str = None,
620 | new_summary: str = None, new_content_rating: str = None,
621 | new_labels: List[str] = None, add_labels: List[str] = None,
622 | remove_labels: List[str] = None,
623 | poster_path: str = None, poster_url: str = None,
624 | background_path: str = None, background_url: str = None,
625 | new_advanced_settings: Dict[str, Any] = None) -> str:
626 | """Comprehensively edit a collection's attributes.
627 |
628 | Args:
629 | collection_title: Title of the collection to edit (optional if collection_id is provided)
630 | collection_id: ID of the collection to edit (optional if collection_title is provided)
631 | library_name: Name of the library containing the collection (required if using collection_title)
632 | new_title: New title for the collection
633 | new_sort_title: New sort title for the collection
634 | new_summary: New summary/description for the collection
635 | new_content_rating: New content rating (e.g., PG-13, R, etc.)
636 | new_labels: Set completely new labels (replaces existing)
637 | add_labels: Labels to add to existing ones
638 | remove_labels: Labels to remove from existing ones
639 | poster_path: Path to a new poster image file
640 | poster_url: URL to a new poster image
641 | background_path: Path to a new background/art image file
642 | background_url: URL to a new background/art image
643 | new_advanced_settings: Dictionary of advanced settings to apply
644 | """
645 | try:
646 | plex = connect_to_plex()
647 |
648 | # Validate that at least one identifier is provided
649 | if not collection_id and not collection_title:
650 | return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
651 |
652 | # Find the collection
653 | collection = None
654 |
655 | # If collection_id is provided, use it to directly fetch the collection
656 | if collection_id:
657 | try:
658 | # Try fetching by ratingKey first
659 | try:
660 | collection = plex.fetchItem(collection_id)
661 | except:
662 | # If that fails, try finding by key in all libraries
663 | collection = None
664 | for section in plex.library.sections():
665 | if section.type in ['movie', 'show']:
666 | try:
667 | for c in section.collections():
668 | if c.ratingKey == collection_id:
669 | collection = c
670 | break
671 | if collection:
672 | break
673 | except:
674 | continue
675 |
676 | if not collection:
677 | return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
678 | except Exception as e:
679 | return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
680 | else:
681 | # If we get here, we're searching by title
682 | if not library_name:
683 | return json.dumps({"error": "Library name is required when editing by collection title"}, indent=4)
684 |
685 | # Find the library
686 | try:
687 | library = plex.library.section(library_name)
688 | except NotFound:
689 | return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
690 |
691 | # Find matching collections
692 | matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
693 |
694 | if not matching_collections:
695 | return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
696 |
697 | # If multiple matching collections, return list of matches with IDs
698 | if len(matching_collections) > 1:
699 | matches = []
700 | for c in matching_collections:
701 | matches.append({
702 | "title": c.title,
703 | "id": c.ratingKey,
704 | "library": library_name,
705 | "item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
706 | })
707 |
708 | # Return as a direct array like playlist_list
709 | return json.dumps(matches, indent=4)
710 |
711 | collection = matching_collections[0]
712 |
713 | # Track changes
714 | changes = []
715 |
716 | # Edit basic attributes
717 | edit_params = {}
718 |
719 | if new_title is not None and new_title != collection.title:
720 | edit_params['title'] = new_title
721 | changes.append(f"title to '{new_title}'")
722 |
723 | if new_sort_title is not None:
724 | current_sort = getattr(collection, 'titleSort', '')
725 | if new_sort_title != current_sort:
726 | edit_params['titleSort'] = new_sort_title
727 | changes.append(f"sort title to '{new_sort_title}'")
728 |
729 | if new_summary is not None:
730 | current_summary = getattr(collection, 'summary', '')
731 | if new_summary != current_summary:
732 | edit_params['summary'] = new_summary
733 | changes.append("summary")
734 |
735 | if new_content_rating is not None:
736 | current_rating = getattr(collection, 'contentRating', '')
737 | if new_content_rating != current_rating:
738 | edit_params['contentRating'] = new_content_rating
739 | changes.append(f"content rating to '{new_content_rating}'")
740 |
741 | # Apply the basic edits if any parameters were set
742 | if edit_params:
743 | collection.edit(**edit_params)
744 |
745 | # Handle labels
746 | current_labels = getattr(collection, 'labels', [])
747 |
748 | if new_labels is not None:
749 | # Replace all labels
750 | collection.removeLabel(current_labels)
751 | if new_labels:
752 | collection.addLabel(new_labels)
753 | changes.append("labels completely replaced")
754 | else:
755 | # Handle adding and removing individual labels
756 | if add_labels:
757 | for label in add_labels:
758 | if label not in current_labels:
759 | collection.addLabel(label)
760 | changes.append(f"added labels: {', '.join(add_labels)}")
761 |
762 | if remove_labels:
763 | for label in remove_labels:
764 | if label in current_labels:
765 | collection.removeLabel(label)
766 | changes.append(f"removed labels: {', '.join(remove_labels)}")
767 |
768 | # Handle artwork
769 | if poster_path:
770 | collection.uploadPoster(filepath=poster_path)
771 | changes.append("poster (from file)")
772 | elif poster_url:
773 | collection.uploadPoster(url=poster_url)
774 | changes.append("poster (from URL)")
775 |
776 | if background_path:
777 | collection.uploadArt(filepath=background_path)
778 | changes.append("background art (from file)")
779 | elif background_url:
780 | collection.uploadArt(url=background_url)
781 | changes.append("background art (from URL)")
782 |
783 | # Handle advanced settings
784 | if new_advanced_settings:
785 | for key, value in new_advanced_settings.items():
786 | try:
787 | setattr(collection, key, value)
788 | changes.append(f"advanced setting '{key}'")
789 | except Exception as setting_error:
790 | return json.dumps({
791 | "error": f"Error setting advanced parameter '{key}': {str(setting_error)}"
792 | }, indent=4)
793 |
794 | if not changes:
795 | return json.dumps({"updated": False, "message": "No changes made to the collection"}, indent=4)
796 |
797 | # Get the collection title for the response (use new_title if it was changed)
798 | collection_title_to_return = new_title if new_title else collection.title
799 |
800 | return json.dumps({
801 | "updated": True,
802 | "title": collection_title_to_return,
803 | "changes": changes
804 | }, indent=4)
805 | except Exception as e:
806 | return json.dumps({"error": str(e)}, indent=4)
807 |
```
--------------------------------------------------------------------------------
/modules/client.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Client-related functions for Plex Media Server.
3 | Provides tools to connect to clients and control media playback.
4 | """
5 | import json
6 | import time
7 | from typing import List, Dict, Optional, Union, Any
8 |
9 | from modules import mcp, connect_to_plex
10 | from plexapi.exceptions import NotFound, Unauthorized
11 |
12 | @mcp.tool()
13 | async def client_list(include_details: bool = True) -> str:
14 | """List all available Plex clients connected to the server.
15 |
16 | Args:
17 | include_details: Whether to include detailed information about each client
18 |
19 | Returns:
20 | List of client names or detailed info dictionaries
21 | """
22 | try:
23 | plex = connect_to_plex()
24 | clients = plex.clients()
25 |
26 | # Also get session clients which may not appear in clients()
27 | sessions = plex.sessions()
28 | session_clients = []
29 |
30 | # Extract clients from sessions
31 | for session in sessions:
32 | if hasattr(session, 'player') and session.player:
33 | session_clients.append(session.player)
34 |
35 | # Combine both client lists, avoiding duplicates
36 | all_clients = clients.copy()
37 | client_ids = {client.machineIdentifier for client in clients}
38 |
39 | for client in session_clients:
40 | if hasattr(client, 'machineIdentifier') and client.machineIdentifier not in client_ids:
41 | all_clients.append(client)
42 | client_ids.add(client.machineIdentifier)
43 |
44 | if not all_clients:
45 | return json.dumps({
46 | "status": "success",
47 | "message": "No clients currently connected to your Plex server.",
48 | "count": 0,
49 | "clients": []
50 | })
51 |
52 | result = []
53 | if include_details:
54 | for client in all_clients:
55 | result.append({
56 | "name": client.title,
57 | "device": getattr(client, 'device', 'Unknown'),
58 | "model": getattr(client, "model", "Unknown"),
59 | "product": getattr(client, 'product', 'Unknown'),
60 | "version": getattr(client, 'version', 'Unknown'),
61 | "platform": getattr(client, "platform", "Unknown"),
62 | "state": getattr(client, "state", "Unknown"),
63 | "machineIdentifier": getattr(client, 'machineIdentifier', 'Unknown'),
64 | "address": getattr(client, "_baseurl", "Unknown") or getattr(client, "address", "Unknown"),
65 | "protocolCapabilities": getattr(client, "protocolCapabilities", [])
66 | })
67 | else:
68 | result = [client.title for client in all_clients]
69 |
70 | return json.dumps({
71 | "status": "success",
72 | "message": f"Found {len(all_clients)} connected clients",
73 | "count": len(all_clients),
74 | "clients": result
75 | }, indent=2)
76 |
77 | except Exception as e:
78 | return json.dumps({
79 | "status": "error",
80 | "message": f"Error listing clients: {str(e)}"
81 | })
82 |
83 | @mcp.tool()
84 | async def client_get_details(client_name: str) -> str:
85 | """Get detailed information about a specific Plex client.
86 |
87 | Args:
88 | client_name: Name of the client to get details for
89 |
90 | Returns:
91 | Dictionary containing client details
92 | """
93 | try:
94 | plex = connect_to_plex()
95 |
96 | # Get regular clients
97 | regular_clients = plex.clients()
98 |
99 | # Also get clients from sessions
100 | sessions = plex.sessions()
101 | session_clients = []
102 |
103 | # Extract clients from sessions
104 | for session in sessions:
105 | if hasattr(session, 'player') and session.player:
106 | session_clients.append(session.player)
107 |
108 | # Try to find the client first in regular clients
109 | client = None
110 | try:
111 | client = plex.client(client_name)
112 | except NotFound:
113 | # Try to find a client with a matching name in regular clients
114 | matching_clients = [c for c in regular_clients if client_name.lower() in c.title.lower()]
115 | if matching_clients:
116 | client = matching_clients[0]
117 | else:
118 | # Try to find in session clients
119 | matching_session_clients = [c for c in session_clients if
120 | hasattr(c, 'title') and client_name.lower() in c.title.lower()]
121 | if matching_session_clients:
122 | client = matching_session_clients[0]
123 | else:
124 | return json.dumps({
125 | "status": "error",
126 | "message": f"No client found matching '{client_name}'"
127 | })
128 |
129 | client_details = {
130 | "name": client.title,
131 | "device": getattr(client, 'device', 'Unknown'),
132 | "deviceClass": getattr(client, "deviceClass", "Unknown"),
133 | "model": getattr(client, "model", "Unknown"),
134 | "product": getattr(client, 'product', 'Unknown'),
135 | "version": getattr(client, 'version', 'Unknown'),
136 | "platform": getattr(client, "platform", "Unknown"),
137 | "platformVersion": getattr(client, "platformVersion", "Unknown"),
138 | "state": getattr(client, "state", "Unknown"),
139 | "machineIdentifier": getattr(client, 'machineIdentifier', 'Unknown'),
140 | "protocolCapabilities": getattr(client, "protocolCapabilities", []),
141 | "address": getattr(client, "_baseurl", "Unknown") or getattr(client, "address", "Unknown"),
142 | "local": getattr(client, "local", "Unknown"),
143 | "protocol": getattr(client, "protocol", "plex"),
144 | "protocolVersion": getattr(client, "protocolVersion", "Unknown"),
145 | "vendor": getattr(client, "vendor", "Unknown"),
146 | }
147 |
148 | return json.dumps({
149 | "status": "success",
150 | "client": client_details
151 | }, indent=2)
152 |
153 | except Exception as e:
154 | return json.dumps({
155 | "status": "error",
156 | "message": f"Error getting client details: {str(e)}"
157 | })
158 |
159 | @mcp.tool()
160 | async def client_get_timelines(client_name: str) -> str:
161 | """Get the current timeline information for a specific Plex client.
162 |
163 | Args:
164 | client_name: Name of the client to get timeline for
165 |
166 | Returns:
167 | Timeline information for the client
168 | """
169 | try:
170 | plex = connect_to_plex()
171 |
172 | # Get regular clients
173 | regular_clients = plex.clients()
174 |
175 | # Also get clients from sessions
176 | sessions = plex.sessions()
177 | session_clients = []
178 |
179 | # Extract clients from sessions
180 | for session in sessions:
181 | if hasattr(session, 'player') and session.player:
182 | session_clients.append(session.player)
183 |
184 | # Try to find the client first in regular clients
185 | client = None
186 | try:
187 | client = plex.client(client_name)
188 | except NotFound:
189 | # Try to find a client with a matching name in regular clients
190 | matching_clients = [c for c in regular_clients if client_name.lower() in c.title.lower()]
191 | if matching_clients:
192 | client = matching_clients[0]
193 | else:
194 | # Try to find in session clients
195 | matching_session_clients = [c for c in session_clients if
196 | hasattr(c, 'title') and client_name.lower() in c.title.lower()]
197 | if matching_session_clients:
198 | client = matching_session_clients[0]
199 | else:
200 | return json.dumps({
201 | "status": "error",
202 | "message": f"No client found matching '{client_name}'"
203 | })
204 |
205 | # Some clients may not always respond to timeline requests
206 | try:
207 | timeline = client.timeline
208 |
209 | # If timeline is None, the client might not be actively playing anything
210 | if timeline is None:
211 | # Check if this client has an active session
212 | for session in sessions:
213 | if (hasattr(session, 'player') and session.player and
214 | hasattr(session.player, 'machineIdentifier') and
215 | hasattr(client, 'machineIdentifier') and
216 | session.player.machineIdentifier == client.machineIdentifier):
217 | # Use session information instead
218 | session_data = {
219 | "state": session.player.state if hasattr(session.player, 'state') else "Unknown",
220 | "time": session.viewOffset if hasattr(session, 'viewOffset') else 0,
221 | "duration": session.duration if hasattr(session, 'duration') else 0,
222 | "progress": round((session.viewOffset / session.duration * 100) if hasattr(session, 'viewOffset') and
223 | hasattr(session, 'duration') and session.duration else 0, 2),
224 | "title": session.title if hasattr(session, 'title') else "Unknown",
225 | "type": session.type if hasattr(session, 'type') else "Unknown",
226 | }
227 |
228 | return json.dumps({
229 | "status": "success",
230 | "client_name": client.title,
231 | "source": "session",
232 | "timeline": session_data
233 | }, indent=2)
234 |
235 | return json.dumps({
236 | "status": "info",
237 | "message": f"Client '{client.title}' is not currently playing any media.",
238 | "client_name": client.title
239 | })
240 |
241 | # Process timeline data
242 | timeline_data = {
243 | "type": timeline.type,
244 | "state": timeline.state,
245 | "time": timeline.time,
246 | "duration": timeline.duration,
247 | "progress": round((timeline.time / timeline.duration * 100) if timeline.duration else 0, 2),
248 | "key": getattr(timeline, "key", None),
249 | "ratingKey": getattr(timeline, "ratingKey", None),
250 | "playQueueItemID": getattr(timeline, "playQueueItemID", None),
251 | "playbackRate": getattr(timeline, "playbackRate", 1),
252 | "shuffled": getattr(timeline, "shuffled", False),
253 | "repeated": getattr(timeline, "repeated", 0),
254 | "muted": getattr(timeline, "muted", False),
255 | "volume": getattr(timeline, "volume", None),
256 | "title": getattr(timeline, "title", None),
257 | "guid": getattr(timeline, "guid", None),
258 | }
259 |
260 | return json.dumps({
261 | "status": "success",
262 | "client_name": client.title,
263 | "source": "timeline",
264 | "timeline": timeline_data
265 | }, indent=2)
266 | except:
267 | # Check if there's an active session for this client
268 | for session in sessions:
269 | if (hasattr(session, 'player') and session.player and
270 | hasattr(session.player, 'machineIdentifier') and
271 | hasattr(client, 'machineIdentifier') and
272 | session.player.machineIdentifier == client.machineIdentifier):
273 | # Use session information instead
274 | session_data = {
275 | "state": session.player.state if hasattr(session.player, 'state') else "Unknown",
276 | "time": session.viewOffset if hasattr(session, 'viewOffset') else 0,
277 | "duration": session.duration if hasattr(session, 'duration') else 0,
278 | "progress": round((session.viewOffset / session.duration * 100) if hasattr(session, 'viewOffset') and
279 | hasattr(session, 'duration') and session.duration else 0, 2),
280 | "title": session.title if hasattr(session, 'title') else "Unknown",
281 | "type": session.type if hasattr(session, 'type') else "Unknown",
282 | }
283 |
284 | return json.dumps({
285 | "status": "success",
286 | "client_name": client.title,
287 | "source": "session",
288 | "timeline": session_data
289 | }, indent=2)
290 |
291 | return json.dumps({
292 | "status": "warning",
293 | "message": f"Unable to get timeline information for client '{client.title}'. The client may not be responding to timeline requests.",
294 | "client_name": client.title
295 | })
296 |
297 | except Exception as e:
298 | return json.dumps({
299 | "status": "error",
300 | "message": f"Error getting client timeline: {str(e)}"
301 | })
302 |
303 | @mcp.tool()
304 | async def client_get_active() -> str:
305 | """Get all clients that are currently playing media.
306 |
307 | Returns:
308 | List of active clients with their playback status
309 | """
310 | try:
311 | plex = connect_to_plex()
312 |
313 | # Get all sessions
314 | sessions = plex.sessions()
315 |
316 | if not sessions:
317 | return json.dumps({
318 | "status": "success",
319 | "message": "No active playback sessions found.",
320 | "count": 0,
321 | "active_clients": []
322 | })
323 |
324 | active_clients = []
325 |
326 | for session in sessions:
327 | if hasattr(session, 'player') and session.player:
328 | player = session.player
329 |
330 | # Get media information
331 | media_info = {
332 | "title": session.title if hasattr(session, 'title') else "Unknown",
333 | "type": session.type if hasattr(session, 'type') else "Unknown",
334 | }
335 |
336 | # Add additional info based on media type
337 | if hasattr(session, 'type'):
338 | if session.type == 'episode':
339 | media_info["show"] = getattr(session, 'grandparentTitle', 'Unknown Show')
340 | media_info["season"] = getattr(session, 'parentTitle', 'Unknown Season')
341 | media_info["seasonEpisode"] = f"S{getattr(session, 'parentIndex', '?')}E{getattr(session, 'index', '?')}"
342 | elif session.type == 'movie':
343 | media_info["year"] = getattr(session, 'year', 'Unknown')
344 |
345 | # Calculate progress if possible
346 | progress = None
347 | if hasattr(session, 'viewOffset') and hasattr(session, 'duration') and session.duration:
348 | progress = round((session.viewOffset / session.duration) * 100, 1)
349 |
350 | # Get user info
351 | username = "Unknown User"
352 | if hasattr(session, 'usernames') and session.usernames:
353 | username = session.usernames[0]
354 |
355 | # Get transcoding status
356 | transcoding = False
357 | if hasattr(session, 'transcodeSessions') and session.transcodeSessions:
358 | transcoding = True
359 |
360 | client_info = {
361 | "name": player.title,
362 | "device": getattr(player, 'device', 'Unknown'),
363 | "product": getattr(player, 'product', 'Unknown'),
364 | "platform": getattr(player, 'platform', 'Unknown'),
365 | "state": getattr(player, 'state', 'Unknown'),
366 | "user": username,
367 | "media": media_info,
368 | "progress": progress,
369 | "transcoding": transcoding
370 | }
371 |
372 | active_clients.append(client_info)
373 |
374 | return json.dumps({
375 | "status": "success",
376 | "message": f"Found {len(active_clients)} active clients",
377 | "count": len(active_clients),
378 | "active_clients": active_clients
379 | }, indent=2)
380 |
381 | except Exception as e:
382 | return json.dumps({
383 | "status": "error",
384 | "message": f"Error getting active clients: {str(e)}"
385 | })
386 |
387 | @mcp.tool()
388 | async def client_start_playback(media_title: str, client_name: str = None,
389 | offset: int = 0, library_name: str = None,
390 | use_external_player: bool = False) -> str:
391 | """Start playback of media on a specified client.
392 |
393 | Args:
394 | media_title: Title of the media to play
395 | client_name: Optional name of the client to play on (will prompt if not provided)
396 | offset: Optional time offset in milliseconds to start from
397 | library_name: Optional name of the library to search in
398 | use_external_player: Whether to use the client's external player
399 | """
400 | try:
401 | plex = connect_to_plex()
402 |
403 | # First, find the media item
404 | results = []
405 | if library_name:
406 | try:
407 | library = plex.library.section(library_name)
408 | results = library.search(title=media_title)
409 | except Exception:
410 | return json.dumps({
411 | "status": "error",
412 | "message": f"Library '{library_name}' not found"
413 | })
414 | else:
415 | results = plex.search(media_title)
416 |
417 | if not results:
418 | return json.dumps({
419 | "status": "error",
420 | "message": f"No media found matching '{media_title}'"
421 | })
422 |
423 | if len(results) > 1:
424 | # If multiple results, provide information about them
425 | media_list = []
426 | for i, media in enumerate(results[:10], 1): # Limit to first 10 to avoid overwhelming
427 | media_type = getattr(media, 'type', 'unknown')
428 | title = getattr(media, 'title', 'Unknown')
429 | year = getattr(media, 'year', '')
430 |
431 | media_info = {
432 | "index": i,
433 | "title": title,
434 | "type": media_type,
435 | }
436 |
437 | if year:
438 | media_info["year"] = year
439 |
440 | if media_type == 'episode':
441 | show = getattr(media, 'grandparentTitle', 'Unknown Show')
442 | season = getattr(media, 'parentIndex', '?')
443 | episode = getattr(media, 'index', '?')
444 | media_info["show"] = show
445 | media_info["season"] = season
446 | media_info["episode"] = episode
447 |
448 | media_list.append(media_info)
449 |
450 | return json.dumps({
451 | "status": "multiple_results",
452 | "message": f"Multiple items found matching '{media_title}'. Please specify a library or use a more specific title.",
453 | "count": len(results),
454 | "results": media_list
455 | }, indent=2)
456 |
457 | media = results[0]
458 |
459 | # If no client name specified, list available clients
460 | if not client_name:
461 | clients = plex.clients()
462 |
463 | if not clients:
464 | return json.dumps({
465 | "status": "error",
466 | "message": "No clients are currently connected to your Plex server."
467 | })
468 |
469 | client_list = []
470 | for i, client in enumerate(clients, 1):
471 | client_list.append({
472 | "index": i,
473 | "name": client.title,
474 | "device": getattr(client, 'device', 'Unknown')
475 | })
476 |
477 | return json.dumps({
478 | "status": "client_selection",
479 | "message": "Please specify a client to play on using the client_name parameter",
480 | "available_clients": client_list
481 | }, indent=2)
482 |
483 | # Try to find the client
484 | try:
485 | client = plex.client(client_name)
486 | except NotFound:
487 | # Try to find a client with a matching name
488 | matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
489 | if matching_clients:
490 | client = matching_clients[0]
491 | else:
492 | return json.dumps({
493 | "status": "error",
494 | "message": f"No client found matching '{client_name}'"
495 | })
496 |
497 | # Start playback
498 | media_type = getattr(media, 'type', 'unknown')
499 | title = getattr(media, 'title', 'Unknown')
500 |
501 | formatted_title = title
502 | if media_type == 'episode':
503 | show = getattr(media, 'grandparentTitle', 'Unknown Show')
504 | season = getattr(media, 'parentIndex', '?')
505 | episode = getattr(media, 'index', '?')
506 | formatted_title = f"{show} - S{season}E{episode} - {title}"
507 | elif hasattr(media, 'year') and media.year:
508 | formatted_title = f"{title} ({media.year})"
509 |
510 | try:
511 | if use_external_player:
512 | # Open in external player if supported by client
513 | if "Player" in client.protocolCapabilities:
514 | media.playOn(client)
515 | else:
516 | return json.dumps({
517 | "status": "error",
518 | "message": f"Client '{client.title}' does not support external player"
519 | })
520 | else:
521 | # Normal playback
522 | client.playMedia(media, offset=offset)
523 |
524 | return json.dumps({
525 | "status": "success",
526 | "message": f"Started playback of '{formatted_title}' on {client.title}",
527 | "media": {
528 | "title": title,
529 | "type": media_type,
530 | "formatted_title": formatted_title,
531 | "rating_key": getattr(media, 'ratingKey', None)
532 | },
533 | "client": client.title,
534 | "offset": offset
535 | }, indent=2)
536 | except Exception as e:
537 | return json.dumps({
538 | "status": "error",
539 | "message": f"Error starting playback: {str(e)}"
540 | })
541 |
542 | except Exception as e:
543 | return json.dumps({
544 | "status": "error",
545 | "message": f"Error setting up playback: {str(e)}"
546 | })
547 |
548 | @mcp.tool()
549 | async def client_control_playback(client_name: str, action: str,
550 | parameter: int = None, media_type: str = 'video') -> str:
551 | """Control playback on a specified client.
552 |
553 | Args:
554 | client_name: Name of the client to control
555 | action: Action to perform (play, pause, stop, skipNext, skipPrevious,
556 | stepForward, stepBack, seekTo, seekForward, seekBack, mute, unmute, setVolume)
557 | parameter: Parameter for actions that require it (like setVolume or seekTo)
558 | media_type: Type of media being controlled ('video', 'music', or 'photo')
559 | """
560 | try:
561 | plex = connect_to_plex()
562 |
563 | # Validate action
564 | valid_actions = [
565 | 'play', 'pause', 'stop', 'skipNext', 'skipPrevious',
566 | 'stepForward', 'stepBack', 'seekTo', 'seekForward', 'seekBack',
567 | 'mute', 'unmute', 'setVolume'
568 | ]
569 |
570 | if action not in valid_actions:
571 | return json.dumps({
572 | "status": "error",
573 | "message": f"Invalid action '{action}'. Valid actions are: {', '.join(valid_actions)}"
574 | })
575 |
576 | # Check if parameter is needed but not provided
577 | actions_needing_parameter = ['seekTo', 'setVolume']
578 | if action in actions_needing_parameter and parameter is None:
579 | return json.dumps({
580 | "status": "error",
581 | "message": f"Action '{action}' requires a parameter value."
582 | })
583 |
584 | # Validate media type
585 | valid_media_types = ['video', 'music', 'photo']
586 | if media_type not in valid_media_types:
587 | return json.dumps({
588 | "status": "error",
589 | "message": f"Invalid media type '{media_type}'. Valid types are: {', '.join(valid_media_types)}"
590 | })
591 |
592 | # Try to find the client
593 | try:
594 | client = plex.client(client_name)
595 | except NotFound:
596 | # Try to find a client with a matching name
597 | matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
598 | if matching_clients:
599 | client = matching_clients[0]
600 | else:
601 | return json.dumps({
602 | "status": "error",
603 | "message": f"No client found matching '{client_name}'"
604 | })
605 |
606 | # Check if the client has playback control capability
607 | if "playback" not in client.protocolCapabilities:
608 | return json.dumps({
609 | "status": "error",
610 | "message": f"Client '{client.title}' does not support playback control."
611 | })
612 |
613 | # Perform the requested action
614 | try:
615 | # Transport controls
616 | if action == 'play':
617 | client.play()
618 | elif action == 'pause':
619 | client.pause()
620 | elif action == 'stop':
621 | client.stop()
622 | elif action == 'skipNext':
623 | client.skipNext()
624 | elif action == 'skipPrevious':
625 | client.skipPrevious()
626 | elif action == 'stepForward':
627 | client.stepForward()
628 | elif action == 'stepBack':
629 | client.stepBack()
630 |
631 | # Seeking
632 | elif action == 'seekTo':
633 | # Parameter should be milliseconds
634 | client.seekTo(parameter)
635 | elif action == 'seekForward':
636 | # Default to 30 seconds if no parameter
637 | seconds = parameter if parameter is not None else 30
638 | client.seekTo(client.timeline.time + (seconds * 1000))
639 | elif action == 'seekBack':
640 | # Default to 30 seconds if no parameter
641 | seconds = parameter if parameter is not None else 30
642 | seek_time = max(0, client.timeline.time - (seconds * 1000))
643 | client.seekTo(seek_time)
644 |
645 | # Volume controls
646 | elif action == 'mute':
647 | client.mute()
648 | elif action == 'unmute':
649 | client.unmute()
650 | elif action == 'setVolume':
651 | # Parameter should be 0-100
652 | if parameter < 0 or parameter > 100:
653 | return json.dumps({
654 | "status": "error",
655 | "message": "Volume must be between 0 and 100"
656 | })
657 | client.setVolume(parameter)
658 |
659 | # Check timeline to confirm the action (may take a moment to update)
660 | time.sleep(0.5) # Give a short delay for state to update
661 |
662 | # Get updated timeline info
663 | timeline = None
664 | try:
665 | timeline = client.timeline
666 | if timeline:
667 | timeline_data = {
668 | "state": timeline.state,
669 | "time": timeline.time,
670 | "duration": timeline.duration,
671 | "volume": getattr(timeline, "volume", None),
672 | "muted": getattr(timeline, "muted", None)
673 | }
674 | else:
675 | timeline_data = None
676 | except:
677 | timeline_data = None
678 |
679 | return json.dumps({
680 | "status": "success",
681 | "message": f"Successfully performed action '{action}' on client '{client.title}'",
682 | "action": action,
683 | "client": client.title,
684 | "parameter": parameter,
685 | "timeline": timeline_data
686 | }, indent=2)
687 |
688 | except Exception as e:
689 | return json.dumps({
690 | "status": "error",
691 | "message": f"Error controlling playback: {str(e)}"
692 | })
693 |
694 | except Exception as e:
695 | return json.dumps({
696 | "status": "error",
697 | "message": f"Error setting up playback control: {str(e)}"
698 | })
699 |
700 | @mcp.tool()
701 | async def client_navigate(client_name: str, action: str) -> str:
702 | """Navigate a Plex client interface.
703 |
704 | Args:
705 | client_name: Name of the client to navigate
706 | action: Navigation action to perform (moveUp, moveDown, moveLeft, moveRight,
707 | select, back, home, contextMenu)
708 | """
709 | try:
710 | plex = connect_to_plex()
711 |
712 | # Validate action
713 | valid_actions = [
714 | 'moveUp', 'moveDown', 'moveLeft', 'moveRight',
715 | 'select', 'back', 'home', 'contextMenu'
716 | ]
717 |
718 | if action not in valid_actions:
719 | return json.dumps({
720 | "status": "error",
721 | "message": f"Invalid navigation action '{action}'. Valid actions are: {', '.join(valid_actions)}"
722 | })
723 |
724 | # Try to find the client
725 | try:
726 | client = plex.client(client_name)
727 | except NotFound:
728 | # Try to find a client with a matching name
729 | matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
730 | if matching_clients:
731 | client = matching_clients[0]
732 | else:
733 | return json.dumps({
734 | "status": "error",
735 | "message": f"No client found matching '{client_name}'"
736 | })
737 |
738 | # Check if the client has navigation capability
739 | if "navigation" not in client.protocolCapabilities:
740 | return json.dumps({
741 | "status": "error",
742 | "message": f"Client '{client.title}' does not support navigation control."
743 | })
744 |
745 | # Perform the requested action
746 | try:
747 | if action == 'moveUp':
748 | client.moveUp()
749 | elif action == 'moveDown':
750 | client.moveDown()
751 | elif action == 'moveLeft':
752 | client.moveLeft()
753 | elif action == 'moveRight':
754 | client.moveRight()
755 | elif action == 'select':
756 | client.select()
757 | elif action == 'back':
758 | client.goBack()
759 | elif action == 'home':
760 | client.goToHome()
761 | elif action == 'contextMenu':
762 | client.contextMenu()
763 |
764 | return json.dumps({
765 | "status": "success",
766 | "message": f"Successfully performed navigation action '{action}' on client '{client.title}'",
767 | "action": action,
768 | "client": client.title
769 | }, indent=2)
770 |
771 | except Exception as e:
772 | return json.dumps({
773 | "status": "error",
774 | "message": f"Error navigating client: {str(e)}"
775 | })
776 |
777 | except Exception as e:
778 | return json.dumps({
779 | "status": "error",
780 | "message": f"Error setting up client navigation: {str(e)}"
781 | })
782 |
783 | @mcp.tool()
784 | async def client_set_streams(client_name: str, audio_stream_id: str = None,
785 | subtitle_stream_id: str = None, video_stream_id: str = None) -> str:
786 | """Set audio, subtitle, or video streams for current playback on a client.
787 |
788 | Args:
789 | client_name: Name of the client to set streams for
790 | audio_stream_id: ID of the audio stream to switch to
791 | subtitle_stream_id: ID of the subtitle stream to switch to, use '0' to disable
792 | video_stream_id: ID of the video stream to switch to
793 | """
794 | try:
795 | plex = connect_to_plex()
796 |
797 | # Check if at least one stream ID is provided
798 | if audio_stream_id is None and subtitle_stream_id is None and video_stream_id is None:
799 | return json.dumps({
800 | "status": "error",
801 | "message": "At least one stream ID (audio, subtitle, or video) must be provided."
802 | })
803 |
804 | # Try to find the client
805 | try:
806 | client = plex.client(client_name)
807 | except NotFound:
808 | # Try to find a client with a matching name
809 | matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
810 | if matching_clients:
811 | client = matching_clients[0]
812 | else:
813 | return json.dumps({
814 | "status": "error",
815 | "message": f"No client found matching '{client_name}'"
816 | })
817 |
818 | # Check if client is currently playing
819 | timeline = None
820 | try:
821 | timeline = client.timeline
822 | if timeline is None or not hasattr(timeline, 'state') or timeline.state != 'playing':
823 | # Check active sessions to see if this client has a session
824 | sessions = plex.sessions()
825 | client_session = None
826 |
827 | for session in sessions:
828 | if (hasattr(session, 'player') and session.player and
829 | hasattr(session.player, 'machineIdentifier') and
830 | hasattr(client, 'machineIdentifier') and
831 | session.player.machineIdentifier == client.machineIdentifier):
832 | client_session = session
833 | break
834 |
835 | if not client_session:
836 | return json.dumps({
837 | "status": "error",
838 | "message": f"Client '{client.title}' is not currently playing any media."
839 | })
840 | except:
841 | return json.dumps({
842 | "status": "error",
843 | "message": f"Unable to get playback status for client '{client.title}'."
844 | })
845 |
846 | # Set streams
847 | changed_streams = []
848 | try:
849 | if audio_stream_id is not None:
850 | client.setAudioStream(audio_stream_id)
851 | changed_streams.append(f"audio to {audio_stream_id}")
852 |
853 | if subtitle_stream_id is not None:
854 | client.setSubtitleStream(subtitle_stream_id)
855 | changed_streams.append(f"subtitle to {subtitle_stream_id}")
856 |
857 | if video_stream_id is not None:
858 | client.setVideoStream(video_stream_id)
859 | changed_streams.append(f"video to {video_stream_id}")
860 |
861 | return json.dumps({
862 | "status": "success",
863 | "message": f"Successfully set streams for '{client.title}': {', '.join(changed_streams)}",
864 | "client": client.title,
865 | "changes": {
866 | "audio_stream": audio_stream_id if audio_stream_id is not None else None,
867 | "subtitle_stream": subtitle_stream_id if subtitle_stream_id is not None else None,
868 | "video_stream": video_stream_id if video_stream_id is not None else None
869 | }
870 | }, indent=2)
871 | except Exception as e:
872 | return json.dumps({
873 | "status": "error",
874 | "message": f"Error setting streams: {str(e)}"
875 | })
876 |
877 | except Exception as e:
878 | return json.dumps({
879 | "status": "error",
880 | "message": f"Error setting up stream selection: {str(e)}"
881 | })
```