This is page 1 of 2. Use http://codebase.md/vladimir-tutin/plex-mcp-server?page={x} to view the full context.
# Directory Structure
```
├── .env.example
├── .gitignore
├── .vscode
│ └── launch.json
├── modules
│ ├── __init__.py
│ ├── client.py
│ ├── collection.py
│ ├── library.py
│ ├── media.py
│ ├── playlist.py
│ ├── search.py
│ ├── server.py
│ ├── sessions.py
│ └── user.py
├── plex_mcp_server.py
├── README.md
├── requirements.txt
└── watcher.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
.env
__pycache__
/modules/__pycache__
.venv
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
PLEX_URL=https://app.plex.tv
PLEX_TOKEN=TOKEN
PLEX_USERNAME=USERNAME
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Plex MCP Server
A powerful Model-Controller-Protocol server for interacting with Plex Media Server, providing a standardized JSON-based interface for automation, scripting, and integration with other tools.
## Overview
Plex MCP Server creates a unified API layer on top of the Plex Media Server API, offering:
- **Standardized JSON responses** for compatibility with automation tools, AI systems, and other integrations
- **Multiple transport methods** (stdio and SSE) for flexible integration options
- **Rich command set** for managing libraries, collections, playlists, media, users, and more
- **Error handling** with consistent response formats
- **Easy integration** with automation platforms (like n8n) and custom scripts
## Requirements
- Python 3.8+
- Plex Media Server with valid authentication token
- Access to the Plex server (locally or remotely)
## Installation
1. Clone this repository
2. Install the required dependencies:
```
pip install -r requirements.txt
```
3. Create a `.env` file based on the `.env.example`:
```
cp .env.example .env
```
4. Add your Plex server URL and token to the `.env` file:
```
PLEX_URL=http://your-plex-server:32400
PLEX_TOKEN=your-plex-token
```
## Usage
The server can be run in two transport modes: stdio (Standard Input/Output) or SSE (Server-Sent Events). Each mode is suitable for different integration scenarios.
### Running with stdio Transport
The stdio transport is ideal for direct integration with applications like Claude Desktop or Cursor. It accepts commands via standard input and outputs results to standard output in JSON format.
Basic command line usage:
```bash
python3 -m plex_mcp
```
or
```bash
python3 plex_mcp_server.py --transport stdio
```
#### Configuration Example for Claude Desktop/Cursor
Add this configuration to your application's settings:
```json
{
"plex": {
"command": "python",
"args": [
"C://Users//User//Documents//plex-mcp-server//plex_mcp_server.py",
"--transport=stdio"
],
"env": {
"PLEX_URL":"http://localhost:32400",
"PLEX_TOKEN":"av3khi56h634v3",
"PLEX_USERNAME:"Administrator"
}
}
}
```
### Running with SSE Transport
The Server-Sent Events (SSE) transport provides a web-based interface for integration with web applications and services.
Start the server:
```bash
python3 plex_mcp_server.py --transport sse --host 0.0.0.0 --port 3001
```
Default options:
- Host: 0.0.0.0 (accessible from any network interface)
- Port: 3001
- SSE endpoint: `/sse`
- Message endpoint: `/messages/`
#### Configuration Example for SSE Client
When the server is running in SSE mode, configure your client to connect using:
```json
{
"plex": {
"url": "http://localhost:3001/sse"
}
}
```
With SSE, you can connect to the server via web applications or tools that support SSE connections.
## Command Modules
### Library Module
- List libraries
- Get library statistics
- Refresh libraries
- Scan for new content
- Get library details
- Get recently added content
- Get library contents
### Media Module
- Search for media
- Get detailed media information
- Edit media metadata
- Delete media
- Get and set artwork
- List available artwork
### Playlist Module
- List playlists
- Get playlist contents
- Create playlists
- Delete playlists
- Add items to playlists
- Remove items from playlists
- Edit playlists
- Upload custom poster images
- Copy playlists to other users
### Collection Module
- List collections
- Create collections
- Add items to collections
- Remove items from collections
- Edit collections
### User Module
- Search for users
- Get user information
- Get user's on deck content
- Get user watch history
### Sessions Module
- Get active sessions
- Get media playback history
### Server Module
- Get Plex server logs
- Get server information
- Get bandwidth statistics
- Get current resource usage
- Get and run butler tasks
- Get server alerts
### Client Module
- List clients
- Get client details
- Get client timelines
- Get active clients
- Start media playback
- Control playback (play, pause, etc.)
- Navigate client interfaces
- Set audio/subtitle streams
**Note:** The Client Module functionality is currently limited and not fully implemented. Some features may not work as expected or may be incomplete.
## Response Format
All commands return standardized JSON responses for maximum compatibility with various tools, automation platforms, and AI systems. This consistent structure makes it easy to process responses programmatically.
For successful operations, the response typically includes:
```json
{
"success_field": true,
"relevant_data": "value",
"additional_info": {}
}
```
For errors, the response format is:
```json
{
"error": "Error message describing what went wrong"
}
```
For multiple matches (when searching by title), results are returned as an array of objects with identifying information:
```json
[
{
"title": "Item Title",
"id": 12345,
"type": "movie",
"year": 2023
},
{
"title": "Another Item",
"id": 67890,
"type": "show",
"year": 2022
}
]
```
## Debugging
For development and debugging, you can use the included `watcher.py` script which monitors for changes and automatically restarts the server.
## License
[Include your license information here]
```
--------------------------------------------------------------------------------
/modules/search.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Optional
from modules import mcp, connect_to_plex
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
plexapi>=4.15.0
starlette>=0.28.0
uvicorn>=0.23.0
requests>=2.31.0
python-dotenv>=1.0.0
mcp
```
--------------------------------------------------------------------------------
/.vscode/launch.json:
--------------------------------------------------------------------------------
```json
{
"configurations": [
{
"name": "Python Debugger: Python File",
"type": "debugpy",
"request": "launch",
"program": "${file}"
}
]
}
```
--------------------------------------------------------------------------------
/modules/__init__.py:
--------------------------------------------------------------------------------
```python
import os
import time
from mcp.server.fastmcp import FastMCP # type: ignore
from plexapi.server import PlexServer # type: ignore
from plexapi.myplex import MyPlexAccount # type: ignore
# Add dotenv for .env file support
try:
from dotenv import load_dotenv # type: ignore
# Load environment variables from .env file
load_dotenv()
print("Successfully loaded environment variables from .env file")
except ImportError:
print("Warning: python-dotenv not installed. Environment variables won't be loaded from .env file.")
print("Install with: pip install python-dotenv")
# Initialize FastMCP server
mcp = FastMCP("plex-server")
# Global variables for Plex connection
plex_url = os.environ.get("PLEX_URL", "")
plex_token = os.environ.get("PLEX_TOKEN", "")
server = None
last_connection_time = 0
CONNECTION_TIMEOUT = 30 # seconds
SESSION_TIMEOUT = 60 * 30 # 30 minutes
def connect_to_plex() -> PlexServer:
"""Connect to Plex server using environment variables or stored credentials.
Returns a PlexServer instance with automatic reconnection if needed.
"""
global server, last_connection_time
current_time = time.time()
# Check if we have a valid connection
if server is not None:
# If we've connected recently, reuse the connection
if current_time - last_connection_time < SESSION_TIMEOUT:
# Verify the connection is still alive with a simple request
try:
# Simple API call to verify the connection
server.library.sections()
last_connection_time = current_time
return server
except:
# Connection failed, reset and create a new one
server = None
# Create a new connection
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
# Try connecting directly with a token
if plex_token:
server = PlexServer(plex_url, plex_token, timeout=CONNECTION_TIMEOUT)
last_connection_time = current_time
return server
# If no direct connection, try with MyPlex account
username = os.environ.get("PLEX_USERNAME")
password = os.environ.get("PLEX_PASSWORD")
server_name = os.environ.get("PLEX_SERVER_NAME")
if username and password and server_name:
account = MyPlexAccount(username, password)
# Use the plex_token if available to avoid resource.connect()
# which can be problematic
for resource in account.resources():
if resource.name.lower() == server_name.lower() and resource.provides == 'server':
if resource.connections:
# Try each connection until one works
for connection in resource.connections:
try:
server = PlexServer(connection.uri, account.authenticationToken, timeout=CONNECTION_TIMEOUT)
last_connection_time = current_time
return server
except:
continue
# If we get here, none of the connection attempts worked
# Fall back to resource.connect() as a last resort
server = account.resource(server_name).connect(timeout=CONNECTION_TIMEOUT)
last_connection_time = current_time
return server
raise ValueError("Insufficient Plex credentials provided")
except Exception as e:
if attempt == max_retries - 1: # Last attempt failed
raise ValueError(f"Failed to connect to Plex after {max_retries} attempts: {str(e)}")
# Wait before retrying
time.sleep(retry_delay)
# We shouldn't get here but just in case
raise ValueError("Failed to connect to Plex server")
```
--------------------------------------------------------------------------------
/plex_mcp_server.py:
--------------------------------------------------------------------------------
```python
import argparse
import uvicorn # type: ignore
from starlette.applications import Starlette # type: ignore
from starlette.routing import Mount, Route # type: ignore
from mcp.server import Server # type: ignore
from mcp.server.sse import SseServerTransport # type: ignore
from starlette.requests import Request # type: ignore
# Import the main mcp instance from modules
from modules import mcp, connect_to_plex
# Import all tools to ensure they are registered with MCP
# Library module functions
from modules.library import (
library_list,
library_get_stats,
library_refresh,
library_scan,
library_get_details,
library_get_recently_added,
library_get_contents
)
# User module functions
from modules.user import (
user_search_users,
user_get_info,
user_get_on_deck,
user_get_watch_history,
user_get_statistics
)
# Search module functions
from modules.sessions import (
sessions_get_active,
sessions_get_media_playback_history
)
# Server module functions
from modules.server import (
server_get_plex_logs,
server_get_info,
server_get_bandwidth,
server_get_current_resources,
server_get_butler_tasks,
server_get_alerts,
server_run_butler_task
)
# Playlist module functions
from modules.playlist import (
playlist_list,
playlist_get_contents,
playlist_create,
playlist_delete,
playlist_add_to,
playlist_remove_from,
playlist_edit,
playlist_upload_poster,
playlist_copy_to_user
)
# Collection module functions
from modules.collection import (
collection_list,
collection_create,
collection_add_to,
collection_remove_from,
collection_edit
)
# Media module functions
from modules.media import (
media_search,
media_get_details,
media_edit_metadata,
media_delete,
media_get_artwork,
media_set_artwork,
media_list_available_artwork
)
# Client module functions
from modules.client import (
client_list,
client_get_details,
client_get_timelines,
client_get_active,
client_start_playback,
client_control_playback,
client_navigate,
client_set_streams
)
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""Create a Starlette application that can serve the provided mcp server with SSE."""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send, # noqa: SLF001
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
if __name__ == "__main__":
# Setup command line arguments
parser = argparse.ArgumentParser(description='Run Plex MCP Server')
parser.add_argument('--transport', choices=['stdio', 'sse'], default='sse',
help='Transport method to use (stdio or sse)')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to (for SSE)')
parser.add_argument('--port', type=int, default=3001, help='Port to listen on (for SSE)')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
# Initialize and run the server
print(f"Starting Plex MCP Server with {args.transport} transport...")
print("Set PLEX_URL and PLEX_TOKEN environment variables for connection")
if args.transport == 'stdio':
# Run with stdio transport (original method)
mcp.run(transport='stdio')
else:
# Run with SSE transport
mcp_server = mcp._mcp_server # Access the underlying MCP server
starlette_app = create_starlette_app(mcp_server, debug=args.debug)
print(f"Starting SSE server on http://{args.host}:{args.port}")
print("Access the SSE endpoint at /sse")
uvicorn.run(starlette_app, host=args.host, port=args.port)
```
--------------------------------------------------------------------------------
/watcher.py:
--------------------------------------------------------------------------------
```python
import time
import os
import sys
import subprocess
import argparse
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Default paths and configuration
SERVER_PATH = os.getcwd() # Current working directory
MODULES_PATH = os.path.join(SERVER_PATH, "modules") # Modules subdirectory
SERVER_MODULE = "plex_mcp_server" # Correct module name
class MCPServerHandler(FileSystemEventHandler):
def __init__(self, transport=None, host=None, port=None):
self.process = None
self.transport = transport
self.host = host
self.port = port
self.start_server()
def start_server(self):
if self.process:
print("Forcefully stopping server...")
try:
# First try SIGTERM
self.process.terminate()
# Give it a short time to terminate
for _ in range(3):
if self.process.poll() is not None:
break # Process terminated
time.sleep(0.1)
# If still running, force kill
if self.process.poll() is None:
print("Server still running, killing forcefully...")
self.process.kill()
# Wait for process to be fully killed
self.process.wait()
except Exception as e:
print(f"Error stopping server: {e}")
# In case the process is still running, try one more approach (platform specific)
try:
if self.process.poll() is None and hasattr(os, 'killpg'):
import signal
os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
except Exception:
pass
command = [sys.executable, "-m", SERVER_MODULE]
# Add command line arguments if provided
if self.transport:
command.extend(["--transport", self.transport])
if self.host:
command.extend(["--host", self.host])
if self.port:
command.extend(["--port", str(self.port)])
print(f"Starting server with command: {' '.join(command)}")
# Create the process in its own process group so we can kill it and all its children
if hasattr(subprocess, 'CREATE_NEW_PROCESS_GROUP') and sys.platform == 'win32':
# Windows-specific flag
self.process = subprocess.Popen(
command,
cwd=SERVER_PATH,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
)
else:
# Unix-based systems
self.process = subprocess.Popen(
command,
cwd=SERVER_PATH,
preexec_fn=os.setsid if hasattr(os, 'setsid') else None
)
def on_modified(self, event):
if event.src_path.endswith('.py'):
print(f"Change detected in {event.src_path}")
self.start_server()
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="Watch for changes in MCP server files and restart the server")
parser.add_argument("--transport", help="Transport type (e.g., http, websocket)")
parser.add_argument("--host", help="Host address to bind to")
parser.add_argument("--port", help="Port to bind to")
args = parser.parse_args()
# Create event handler with provided arguments
event_handler = MCPServerHandler(
transport=args.transport,
host=args.host,
port=args.port
)
# Set up observers for both main directory and modules subdirectory
observer = Observer()
observer.schedule(event_handler, SERVER_PATH, recursive=False)
# Make sure modules directory exists before watching it
if os.path.exists(MODULES_PATH) and os.path.isdir(MODULES_PATH):
observer.schedule(event_handler, MODULES_PATH, recursive=True)
else:
print(f"Warning: Modules directory {MODULES_PATH} not found, only watching main directory")
observer.start()
print(f"Watching for changes in {SERVER_PATH} and {MODULES_PATH} (if exists)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping watcher...")
observer.stop()
if event_handler.process:
print("Forcefully stopping server...")
try:
# Try SIGTERM first
event_handler.process.terminate()
# Give it a short time to terminate
for _ in range(3):
if event_handler.process.poll() is not None:
break
time.sleep(0.1)
# If still running, force kill
if event_handler.process.poll() is None:
print("Server still running, killing forcefully...")
event_handler.process.kill()
# Try process group kill as a last resort
if event_handler.process.poll() is None and hasattr(os, 'killpg'):
import signal
os.killpg(os.getpgid(event_handler.process.pid), signal.SIGKILL)
except Exception as e:
print(f"Error while stopping server: {e}")
observer.join()
```
--------------------------------------------------------------------------------
/modules/sessions.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Optional
from modules import mcp, connect_to_plex
# Functions for sessions and playback
@mcp.tool()
async def sessions_get_active(unused: str = None) -> str:
"""Get information about current playback sessions, including IP addresses.
Args:
unused: Unused parameter to satisfy the function signature
"""
try:
plex = connect_to_plex()
# Get all active sessions
sessions = plex.sessions()
if not sessions:
return json.dumps({
"status": "success",
"message": "No active sessions found.",
"sessions_count": 0,
"sessions": []
})
sessions_data = []
transcode_count = 0
direct_play_count = 0
total_bitrate = 0
for session in enumerate(sessions, 1):
i, session = session
# Basic media information
item_type = getattr(session, 'type', 'unknown')
title = getattr(session, 'title', 'Unknown')
# Session information
player = getattr(session, 'player', None)
user = getattr(session, 'usernames', ['Unknown User'])[0]
session_info = {
"session_id": i,
"state": player.state,
"player_name": player.title,
"user": user,
"content_type": item_type,
"player": {},
"progress": {}
}
# Media-specific information
if item_type == 'episode':
show_title = getattr(session, 'grandparentTitle', 'Unknown Show')
season_num = getattr(session, 'parentIndex', '?')
episode_num = getattr(session, 'index', '?')
session_info["content_description"] = f"{show_title} - S{season_num}E{episode_num} - {title} (TV Episode)"
elif item_type == 'movie':
year = getattr(session, 'year', '')
session_info["year"] = year
session_info["content_description"] = f"{title} ({year}) (Movie)"
else:
session_info["content_description"] = f"{title} ({item_type})"
# Player information
if player:
player_info = {
}
# Add IP address if available
if hasattr(player, 'address'):
player_info["ip"] = player.address
# Add platform information if available
if hasattr(player, 'platform'):
player_info["platform"] = player.platform
# Add product information if available
if hasattr(player, 'product'):
player_info["product"] = player.product
# Add device information if available
if hasattr(player, 'device'):
player_info["device"] = player.device
# Add version information if available
if hasattr(player, 'version'):
player_info["version"] = player.version
session_info["player"] = player_info
# Add playback information
if hasattr(session, 'viewOffset') and hasattr(session, 'duration'):
progress = (session.viewOffset / session.duration) * 100
seconds_remaining = (session.duration - session.viewOffset) / 1000
minutes_remaining = seconds_remaining / 60
session_info["progress"] = {
"percent": round(progress, 1),
"minutes_remaining": int(minutes_remaining) if minutes_remaining > 1 else 0
}
# Add quality information if available
if hasattr(session, 'media') and session.media:
media = session.media[0] if isinstance(session.media, list) and session.media else session.media
media_info = {}
bitrate = getattr(media, 'bitrate', None)
if bitrate:
media_info["bitrate"] = f"{bitrate} kbps"
# Add to total bitrate
try:
total_bitrate += int(bitrate)
except (TypeError, ValueError):
pass
resolution = getattr(media, 'videoResolution', None)
if resolution:
media_info["resolution"] = resolution
if media_info:
session_info["media_info"] = media_info
# Transcoding information
transcode_session = getattr(session, 'transcodeSessions', None)
if transcode_session:
transcode = transcode_session[0] if isinstance(transcode_session, list) else transcode_session
transcode_info = {"active": True}
# Add source vs target information if available
if hasattr(transcode, 'sourceVideoCodec') and hasattr(transcode, 'videoCodec'):
transcode_info["video"] = f"{transcode.sourceVideoCodec} → {transcode.videoCodec}"
if hasattr(transcode, 'sourceAudioCodec') and hasattr(transcode, 'audioCodec'):
transcode_info["audio"] = f"{transcode.sourceAudioCodec} → {transcode.audioCodec}"
if hasattr(transcode, 'sourceResolution') and hasattr(transcode, 'width') and hasattr(transcode, 'height'):
transcode_info["resolution"] = f"{transcode.sourceResolution} → {transcode.width}x{transcode.height}"
session_info["transcoding"] = transcode_info
transcode_count += 1
else:
session_info["transcoding"] = {"active": False, "mode": "Direct Play/Stream"}
direct_play_count += 1
sessions_data.append(session_info)
return json.dumps({
"status": "success",
"message": f"Found {len(sessions)} active sessions",
"sessions_count": len(sessions),
"transcode_count": transcode_count,
"direct_play_count": direct_play_count,
"total_bitrate_kbps": total_bitrate,
"sessions": sessions_data
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error getting active sessions: {str(e)}"
})
@mcp.tool()
async def sessions_get_media_playback_history(media_title: str = None, library_name: str = None, media_id: int = None) -> str:
"""Get playback history for a specific media item.
Args:
media_title: Title of the media to get history for (optional if media_id is provided)
library_name: Optional library name to limit search to
media_id: Plex media ID/rating key to directly fetch the item (optional if media_title is provided)
"""
try:
plex = connect_to_plex()
# Check if we have at least one identifier
if not media_title and not media_id:
return json.dumps({
"status": "error",
"message": "Either media_title or media_id must be provided."
})
media = None
results = []
# If media_id is provided, try to fetch the item directly
if media_id:
try:
# fetchItem takes a rating key and returns the media object
media = plex.fetchItem(media_id)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Media with ID '{media_id}' not found: {str(e)}"
})
# Otherwise search by title
elif media_title:
if library_name:
try:
library = plex.library.section(library_name)
results = library.search(title=media_title)
except Exception:
return json.dumps({
"status": "error",
"message": f"Library '{library_name}' not found."
})
else:
results = plex.search(media_title)
if not results:
return json.dumps({
"status": "error",
"message": f"No media found matching '{media_title}'."
})
# If we have multiple results, provide details about each match
if len(results) > 1:
matches = []
for item in results:
item_info = {
"media_id": item.ratingKey,
"type": getattr(item, 'type', 'unknown'),
"title": item.title
}
# Add type-specific info
if item.type == 'episode':
item_info["show_title"] = getattr(item, 'grandparentTitle', 'Unknown Show')
item_info["season"] = getattr(item, 'parentTitle', 'Unknown Season')
item_info["season_number"] = getattr(item, 'parentIndex', '?')
item_info["episode_number"] = getattr(item, 'index', '?')
item_info["formatted_title"] = f"{item_info['show_title']} - S{item_info['season_number']}E{item_info['episode_number']} - {item.title}"
elif item.type == 'movie':
year = getattr(item, 'year', '')
if year:
item_info["year"] = year
item_info["formatted_title"] = f"{item.title} ({year})" if year else item.title
matches.append(item_info)
return json.dumps({
"status": "multiple_matches",
"message": f"Multiple items found with title '{media_title}'. Please specify a library, use a more specific title, or use one of the media_id values below.",
"matches": matches
}, indent=2)
media = results[0]
media_type = getattr(media, 'type', 'unknown')
# Format title differently based on media type
media_info = {
"media_id": media.ratingKey,
"key": media.key
}
if media_type == 'episode':
show = getattr(media, 'grandparentTitle', 'Unknown Show')
season = getattr(media, 'parentTitle', 'Unknown Season')
formatted_title = f"{show} - {season} - {media.title}"
media_info["show_title"] = show
media_info["season_title"] = season
media_info["episode_title"] = media.title
else:
year = getattr(media, 'year', '')
year_str = f" ({year})" if year else ""
formatted_title = f"{media.title}{year_str}"
media_info["title"] = media.title
if year:
media_info["year"] = year
media_info["type"] = media_type
media_info["formatted_title"] = formatted_title
# Get the history using the history() method
try:
history_items = media.history()
if not history_items:
return json.dumps({
"status": "success",
"message": f"No playback history found for '{formatted_title}'.",
"media": media_info,
"play_count": 0,
"history": []
})
history_data = []
for item in history_items:
history_entry = {}
# Get the username if available
account_id = getattr(item, 'accountID', None)
account_name = "Unknown User"
# Try to get the account name from the accountID
if account_id:
try:
# This may not work unless we have admin privileges
account = plex.myPlexAccount()
if account.id == account_id:
account_name = account.title
else:
for user in account.users():
if user.id == account_id:
account_name = user.title
break
except:
# If we can't get the account name, just use the ID
account_name = f"User ID: {account_id}"
history_entry["user"] = account_name
# Get the timestamp when it was viewed
viewed_at = getattr(item, 'viewedAt', None)
viewed_at_str = viewed_at.strftime("%Y-%m-%d %H:%M") if viewed_at else "Unknown time"
history_entry["viewed_at"] = viewed_at_str
# Device information if available
device_id = getattr(item, 'deviceID', None)
device_name = "Unknown Device"
# Try to resolve device name using systemDevice method
if device_id:
try:
device = plex.systemDevice(device_id)
if device and hasattr(device, 'name'):
device_name = device.name
except Exception:
# If we can't resolve the device name, just use the ID
device_name = f"Device ID: {device_id}"
history_entry["device"] = device_name
history_data.append(history_entry)
return json.dumps({
"status": "success",
"media": media_info,
"play_count": len(history_items),
"history": history_data
}, indent=2)
except AttributeError:
# Fallback if history() method is not available
# Get basic view information
view_count = getattr(media, 'viewCount', 0) or 0
last_viewed_at = getattr(media, 'lastViewedAt', None)
if view_count == 0:
return json.dumps({
"status": "success",
"message": f"No one has watched '{formatted_title}' yet.",
"media": media_info,
"play_count": 0
})
result = {
"status": "success",
"media": media_info,
"play_count": view_count,
}
if last_viewed_at:
last_viewed_str = last_viewed_at.strftime("%Y-%m-%d %H:%M") if hasattr(last_viewed_at, 'strftime') else str(last_viewed_at)
result["last_viewed"] = last_viewed_str
# Add any additional account info if available
account_info = getattr(media, 'viewedBy', [])
if account_info:
result["viewed_by"] = [account.title for account in account_info]
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error getting media playback history: {str(e)}"
})
```
--------------------------------------------------------------------------------
/modules/server.py:
--------------------------------------------------------------------------------
```python
from modules import mcp, connect_to_plex
import os
from typing import Dict, List, Any, Optional
import json
import asyncio
import requests
@mcp.tool()
async def server_get_plex_logs(num_lines: int = 100, log_type: str = "server") -> str:
"""Get Plex server logs.
Args:
num_lines: Number of log lines to retrieve
log_type: Type of log to retrieve (server, scanner, transcoder, updater)
"""
try:
import zipfile
import io
import tempfile
import os
import shutil
import traceback
plex = connect_to_plex()
# Map common log type names to the actual file names
log_type_map = {
'server': 'Plex Media Server.log',
'scanner': 'Plex Media Scanner.log',
'transcoder': 'Plex Transcoder.log',
'updater': 'Plex Update Service.log'
}
log_file_name = log_type_map.get(log_type.lower(), log_type)
# Download logs from the Plex server
logs_path_or_data = plex.downloadLogs()
# Handle zipfile content based on what we received
if isinstance(logs_path_or_data, str) and os.path.exists(logs_path_or_data) and logs_path_or_data.endswith('.zip'):
# We received a path to a zip file
with zipfile.ZipFile(logs_path_or_data, 'r') as zip_ref:
log_content = extract_log_from_zip(zip_ref, log_file_name)
# Clean up the downloaded zip if desired
try:
os.remove(logs_path_or_data)
except:
pass # Ignore errors in cleanup
else:
# We received the actual data - process in memory
if isinstance(logs_path_or_data, str):
logs_path_or_data = logs_path_or_data.encode('utf-8')
try:
# Create an in-memory zip file
zip_buffer = io.BytesIO(logs_path_or_data)
with zipfile.ZipFile(zip_buffer, 'r') as zip_ref:
log_content = extract_log_from_zip(zip_ref, log_file_name)
except zipfile.BadZipFile:
return f"Downloaded data is not a valid zip file. First 100 bytes: {logs_path_or_data[:100]}"
# Extract the last num_lines from the log content
log_lines = log_content.splitlines()
log_lines = log_lines[-num_lines:] if len(log_lines) > num_lines else log_lines
result = f"Last {len(log_lines)} lines of {log_file_name}:\n\n"
result += '\n'.join(log_lines)
return result
except Exception as e:
return f"Error getting Plex logs: {str(e)}\n{traceback.format_exc()}"
def extract_log_from_zip(zip_ref, log_file_name):
"""Extract the requested log file content from a zip file object."""
# List all files in the zip
all_files = zip_ref.namelist()
# Find the requested log file
log_file_path = None
for file in all_files:
if log_file_name.lower() in os.path.basename(file).lower():
log_file_path = file
break
if not log_file_path:
raise ValueError(f"Could not find log file for type: {log_file_name}. Available files: {', '.join(all_files)}")
# Read the log file content
with zip_ref.open(log_file_path) as f:
log_content = f.read().decode('utf-8', errors='ignore')
return log_content
@mcp.tool()
async def server_get_info() -> str:
"""Get detailed information about the Plex server.
Returns:
Dictionary containing server details including version, platform, etc.
"""
try:
plex = connect_to_plex()
server_info = {
"version": plex.version,
"platform": plex.platform,
"platform_version": plex.platformVersion,
"updated_at": str(plex.updatedAt) if hasattr(plex, 'updatedAt') else None,
"server_name": plex.friendlyName,
"machine_identifier": plex.machineIdentifier,
"my_plex_username": plex.myPlexUsername,
"my_plex_mapping_state": plex.myPlexMappingState if hasattr(plex, 'myPlexMappingState') else None,
"certificate": plex.certificate if hasattr(plex, 'certificate') else None,
"sync": plex.sync if hasattr(plex, 'sync') else None,
"transcoder_active_video_sessions": plex.transcoderActiveVideoSessions,
"transcoder_audio": plex.transcoderAudio if hasattr(plex, 'transcoderAudio') else None,
"transcoder_video_bitrates": plex.transcoderVideoBitrates,
"transcoder_video_qualities": plex.transcoderVideoQualities,
"transcoder_video_resolutions": plex.transcoderVideoResolutions,
"streaming_brain_version": plex.streamingBrainVersion if hasattr(plex, 'streamingBrainVersion') else None,
"owner_features": plex.ownerFeatures if hasattr(plex, 'ownerFeatures') else None
}
# Format server information as JSON
return json.dumps({"status": "success", "data": server_info}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=4)
@mcp.tool()
async def server_get_bandwidth(timespan: str = None, lan: str = None) -> str:
"""Get bandwidth statistics from the Plex server.
Args:
timespan: Time span for bandwidth data (months, weeks, days, hours, seconds)
lan: Filter by local network (true/false)
Returns:
Dictionary containing bandwidth statistics
"""
try:
plex = connect_to_plex()
# Get bandwidth information
bandwidth_stats = []
if hasattr(plex, 'bandwidth'):
# Prepare kwargs for bandwidth() call
kwargs = {}
# Add timespan if provided
if timespan:
valid_timespans = ['months', 'weeks', 'days', 'hours', 'seconds']
if timespan.lower() in valid_timespans:
kwargs['timespan'] = timespan.lower()
# Add lan filter if provided
if lan is not None:
if lan.lower() == 'true':
kwargs['lan'] = True
elif lan.lower() == 'false':
kwargs['lan'] = False
# Call bandwidth with the constructed kwargs
bandwidth_data = plex.bandwidth(**kwargs)
for bandwidth in bandwidth_data:
# Each bandwidth object has properties like accountID, at, bytes, deviceID, lan, timespan
stats = {
"account": bandwidth.account().name if bandwidth.account() and hasattr(bandwidth.account(), 'name') else None,
"device_id": bandwidth.deviceID if hasattr(bandwidth, 'deviceID') else None,
"device_name": bandwidth.device().name if bandwidth.device() and hasattr(bandwidth.device(), 'name') else None,
"platform": bandwidth.device().platform if bandwidth.device() and hasattr(bandwidth.device(), 'platform') else None,
"client_identifier": bandwidth.device().clientIdentifier if bandwidth.device() and hasattr(bandwidth.device(), 'clientIdentifier') else None,
"at": str(bandwidth.at) if hasattr(bandwidth, 'at') else None,
"bytes": bandwidth.bytes if hasattr(bandwidth, 'bytes') else None,
"is_local": bandwidth.lan if hasattr(bandwidth, 'lan') else None,
"timespan (seconds)": bandwidth.timespan if hasattr(bandwidth, 'timespan') else None
}
bandwidth_stats.append(stats)
# Format bandwidth information as JSON
return json.dumps({"status": "success", "data": bandwidth_stats}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=4)
@mcp.tool()
async def server_get_current_resources() -> str:
"""Get resource usage information from the Plex server.
Returns:
Dictionary containing resource usage statistics
"""
try:
plex = connect_to_plex()
# Get resource information
resources_data = []
if hasattr(plex, 'resources'):
server_resources = plex.resources()
for resource in server_resources:
# Create an entry for each resource timepoint
resource_entry = {
"timestamp": str(resource.at) if hasattr(resource, 'at') else None,
"host_cpu_utilization": resource.hostCpuUtilization if hasattr(resource, 'hostCpuUtilization') else None,
"host_memory_utilization": resource.hostMemoryUtilization if hasattr(resource, 'hostMemoryUtilization') else None,
"process_cpu_utilization": resource.processCpuUtilization if hasattr(resource, 'processCpuUtilization') else None,
"process_memory_utilization": resource.processMemoryUtilization if hasattr(resource, 'processMemoryUtilization') else None,
"timespan": resource.timespan if hasattr(resource, 'timespan') else None
}
resources_data.append(resource_entry)
# Format resource information as JSON
return json.dumps({"status": "success", "data": resources_data}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=4)
@mcp.tool()
async def server_get_butler_tasks() -> str:
"""Get information about Plex Butler tasks.
Returns:
Dictionary containing information about scheduled and running butler tasks
"""
try:
plex = connect_to_plex()
# Get the base URL and token from the Plex connection
base_url = plex._baseurl
token = plex._token
# Make a direct API call to the butler endpoint
url = f"{base_url}/butler"
headers = {'X-Plex-Token': token, 'Accept': 'application/xml'}
# Disable SSL verification if using https
verify = False if base_url.startswith('https') else True
response = requests.get(url, headers=headers, verify=verify)
if response.status_code == 200:
# Parse the XML response
import xml.etree.ElementTree as ET
from xml.dom import minidom
try:
# Try to parse as XML first
root = ET.fromstring(response.text)
# Extract butler tasks
butler_tasks = []
for task_elem in root.findall('.//ButlerTask'):
task = {}
for attr, value in task_elem.attrib.items():
# Convert boolean attributes
if value.lower() in ['true', 'false']:
task[attr] = value.lower() == 'true'
# Convert numeric attributes
elif value.isdigit():
task[attr] = int(value)
else:
task[attr] = value
butler_tasks.append(task)
# Return the butler tasks directly in the data field
return json.dumps({"status": "success", "data": butler_tasks}, indent=4)
except ET.ParseError:
# Return the raw response if XML parsing fails
return json.dumps({
"status": "error",
"message": "Failed to parse XML response",
"raw_response": response.text
}, indent=4)
else:
return json.dumps({
"status": "error",
"message": f"Failed to fetch butler tasks. Status code: {response.status_code}",
"response": response.text
}, indent=4)
except Exception as e:
import traceback
return json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}, indent=4)
@mcp.tool()
async def server_get_alerts(timeout: int = 15) -> str:
"""Get real-time alerts from the Plex server by listening on a websocket.
Args:
timeout: Number of seconds to listen for alerts (default: 15)
Returns:
Dictionary containing server alerts and their details
"""
try:
plex = connect_to_plex()
# Collection for alerts
alerts_data = []
# Define callback function to process alerts
def alert_callback(data):
# Print the raw data to help with debugging
print(f"Raw alert data received: {data}")
try:
# Extract alert information from the raw notification data
# Assuming data is a list/tuple with at least 3 elements as indicated by the log statement
# Format is likely [type, title, description] or similar
alert_type = data[0] if len(data) > 0 else "Unknown"
alert_title = data[1] if len(data) > 1 else "Unknown"
alert_description = data[2] if len(data) > 2 else "No description"
# Create a simplified single-line text representation of the alert
alert_text = f"ALERT: {alert_type} - {alert_title} - {alert_description}"
# Print to console in real-time
print(alert_text)
# Store alert info for JSON response
alert_info = {
"type": alert_type,
"title": alert_title,
"description": alert_description,
"text": alert_text,
"raw_data": data # Include the raw data for complete information
}
alerts_data.append(alert_info)
except Exception as e:
print(f"Error processing alert data: {e}")
# Still try to store some information even if processing fails
alerts_data.append({
"error": str(e),
"raw_data": str(data)
})
print(f"Starting alert listener for {timeout} seconds...")
# Start the alert listener
listener = plex.startAlertListener(alert_callback)
# Wait for the specified timeout period
await asyncio.sleep(timeout)
# Stop the listener
listener.stop()
print(f"Alert listener stopped after {timeout} seconds.")
# Format alerts as JSON
return json.dumps({"status": "success", "data": alerts_data}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=4)
@mcp.tool()
async def server_run_butler_task(task_name: str) -> str:
"""Manually run a specific Plex Butler task now.
Args:
task_name: Name of the butler task to run
Returns:
Success or error message
"""
try:
plex = connect_to_plex()
# Call the runButlerTask method directly on the PlexServer object
# Valid task names: 'BackupDatabase', 'CheckForUpdates', 'CleanOldBundles',
# 'DeepMediaAnalysis', 'GarbageCollection', 'GenerateAutoTags',
# 'OptimizeDatabase', 'RefreshLocalMedia', 'RefreshPeriodicMetadata',
# 'RefreshLibraries', 'UpgradeMediaAnalysis'
# Make a direct API call to run the butler task
base_url = plex._baseurl
token = plex._token
# Use the correct URL structure: /butler/{taskName}
url = f"{base_url}/butler/{task_name}"
headers = {'X-Plex-Token': token}
# Disable SSL verification if using https
verify = False if base_url.startswith('https') else True
print(f"Running butler task: {task_name}")
response = requests.post(url, headers=headers, verify=verify)
print(f"Response status: {response.status_code}")
print(f"Response text: {response.text}")
# Add 202 Accepted to the list of successful status codes
if response.status_code in [200, 201, 202, 204]:
return json.dumps({"status": "success", "message": f"Butler task '{task_name}' started successfully"}, indent=4)
else:
# For error responses, extract the status code and response text in a more readable format
error_message = f"Failed to run butler task. Status code: {response.status_code}"
# Try to extract a cleaner error message from the HTML response if possible
if "<html>" in response.text:
import re
# Try to extract the status message from an HTML response (like "404 Not Found")
title_match = re.search(r'<title>(.*?)</title>', response.text)
if title_match and title_match.group(1):
error_message = f"Failed to run butler task: {title_match.group(1)}"
# Or try to extract from an h1 tag
h1_match = re.search(r'<h1>(.*?)</h1>', response.text)
if h1_match and h1_match.group(1):
error_message = f"Failed to run butler task: {h1_match.group(1)}"
return json.dumps({
"status": "error",
"message": error_message
}, indent=4)
except Exception as e:
import traceback
return json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}, indent=4)
```
--------------------------------------------------------------------------------
/modules/user.py:
--------------------------------------------------------------------------------
```python
from modules import mcp, connect_to_plex
from plexapi.server import PlexServer # type: ignore
import os
import json
import time
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
try:
from dotenv import load_dotenv # type: ignore
# Load environment variables from .env file
load_dotenv()
PLEX_USERNAME = os.environ.get("PLEX_USERNAME", None)
print("Successfully loaded environment variables from .env file")
except ImportError:
print("Warning: python-dotenv not installed. Environment variables won't be loaded from .env file.")
print("Install with: pip install python-dotenv")
@mcp.tool()
async def user_search_users(search_term: str = None) -> str:
"""Search for users with names, usernames, or emails containing the search term, or list all users if no search term is provided.
Args:
search_term: Optional term to search for in user information
"""
try:
plex = connect_to_plex()
# Get account associated with the token
account = plex.myPlexAccount()
# Get list of all friends (shared users)
all_users = account.users()
# Add the owner's account to be searched as well
all_users.append(account)
if search_term:
# Search for users that match the search term
found_users = []
for user in all_users:
username = user.username.lower() if hasattr(user, 'username') else ''
email = user.email.lower() if hasattr(user, 'email') else ''
title = user.title.lower() if hasattr(user, 'title') else ''
if (search_term.lower() in username or
search_term.lower() in email or
search_term.lower() in title):
found_users.append(user)
if not found_users:
return json.dumps({"message": f"No users found matching '{search_term}'."})
# Format the output for found users
result = {
"searchTerm": search_term,
"usersFound": len(found_users),
"users": []
}
for user in found_users:
is_owner = (user.username == account.username)
user_data = {
"role": "Owner" if is_owner else "Shared User",
"username": user.username,
"email": user.email if hasattr(user, 'email') else None,
"title": user.title if hasattr(user, 'title') else user.username
}
# Add servers this user has access to (for shared users)
if not is_owner and hasattr(user, 'servers'):
sections = []
for server in user.servers:
if server.name == account.title or server.name == account.username:
for section in server.sections():
sections.append(section.title)
user_data["libraries"] = sections if sections else []
result["users"].append(user_data)
return json.dumps(result)
else:
# List all users
if not all_users:
return json.dumps({"message": "No shared users found. Only your account has access to this Plex server."})
# Format the output for all users
result = {
"totalUsers": len(all_users),
"owner": {
"username": account.username,
"email": account.email,
"title": account.title
},
"sharedUsers": []
}
# Add all the shared users
for user in all_users:
if user.username != account.username:
result["sharedUsers"].append({
"username": user.username,
"email": user.email if hasattr(user, 'email') else None,
"title": user.title if hasattr(user, 'title') else user.username
})
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error searching users: {str(e)}"})
@mcp.tool()
async def user_get_info(username: str = PLEX_USERNAME) -> str:
"""Get detailed information about a specific Plex user.
Args:
username: Optional. Name of the user to get information for. Defaults to PLEX_USERNAME in .env
"""
try:
plex = connect_to_plex()
# Get account associated with the token
account = plex.myPlexAccount()
# Check if the username is the owner
if username == account.username:
result = {
"role": "Owner",
"username": account.username,
"email": account.email,
"title": account.title,
"uuid": account.uuid,
"authToken": f"{account.authenticationToken[:5]}...{account.authenticationToken[-5:]} (truncated for security)",
"subscription": {
"active": account.subscriptionActive
}
}
if account.subscriptionActive:
result["subscription"]["features"] = account.subscriptionFeatures
result["joinedAt"] = str(account.joinedAt)
return json.dumps(result)
# Search for the user in the friends list
target_user = None
for user in account.users():
if user.username == username:
target_user = user
break
if not target_user:
return json.dumps({"error": f"User '{username}' not found among shared users."})
# Format the output
result = {
"role": "Shared User",
"username": target_user.username,
"email": target_user.email if hasattr(target_user, 'email') else None,
"title": target_user.title if hasattr(target_user, 'title') else target_user.username,
"id": target_user.id if hasattr(target_user, 'id') else None
}
# Add servers and sections this user has access to
if hasattr(target_user, 'servers'):
result["serverAccess"] = []
for server in target_user.servers:
if server.name == account.title or server.name == account.username:
server_data = {
"name": server.name,
"libraries": []
}
for section in server.sections():
server_data["libraries"].append(section.title)
result["serverAccess"].append(server_data)
# Get user's devices if available
if hasattr(target_user, 'devices') and callable(getattr(target_user, 'devices')):
try:
devices = target_user.devices()
if devices:
result["devices"] = []
for device in devices:
device_data = {
"name": device.name,
"platform": device.platform
}
if hasattr(device, 'clientIdentifier'):
device_data["clientId"] = device.clientIdentifier
if hasattr(device, 'createdAt'):
device_data["createdAt"] = str(device.createdAt)
if hasattr(device, 'lastSeenAt'):
device_data["lastSeenAt"] = str(device.lastSeenAt)
result["devices"].append(device_data)
except:
result["devices"] = None
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting user info: {str(e)}"})
@mcp.tool()
async def user_get_on_deck(username: str = PLEX_USERNAME) -> str:
"""Get on deck (in progress) media for a specific user.
Args:
username: Name of the user to get on-deck items for
"""
try:
plex = connect_to_plex()
# Try to switch to the user account to get their specific on-deck items
if username.lower() == plex.myPlexAccount().username.lower():
# This is the main account, use server directly
on_deck_items = plex.library.onDeck()
else:
# For a different user, we need to get access to their account
try:
account = plex.myPlexAccount()
# Find the user in the shared users
target_user = None
for user in account.users():
if user.username.lower() == username.lower() or user.title.lower() == username.lower():
target_user = user
break
if not target_user:
return json.dumps({"error": f"User '{username}' not found."})
# For a shared user, try to switch to that user and get their on-deck items
# This requires admin privileges and may be limited by Plex server's capabilities
user_token = target_user.get_token(plex.machineIdentifier)
if not user_token:
return json.dumps({"error": f"Unable to access on-deck items for user '{username}'. Token not available."})
user_plex = PlexServer(plex._baseurl, user_token)
on_deck_items = user_plex.library.onDeck()
except Exception as user_err:
return json.dumps({"error": f"Error accessing user '{username}': {str(user_err)}"})
if not on_deck_items:
return json.dumps({"message": f"No on-deck items found for user '{username}'."})
result = {
"username": username,
"count": len(on_deck_items),
"items": []
}
for item in on_deck_items:
media_type = getattr(item, 'type', 'unknown')
title = getattr(item, 'title', 'Unknown Title')
item_data = {
"type": media_type,
"title": title
}
if media_type == 'episode':
item_data["show"] = getattr(item, 'grandparentTitle', 'Unknown Show')
item_data["season"] = getattr(item, 'parentTitle', 'Unknown Season')
else:
item_data["year"] = getattr(item, 'year', '')
# Add progress information
if hasattr(item, 'viewOffset') and hasattr(item, 'duration'):
progress_pct = (item.viewOffset / item.duration) * 100
# Format as minutes:seconds
total_mins = item.duration // 60000
current_mins = item.viewOffset // 60000
total_secs = (item.duration % 60000) // 1000
current_secs = (item.viewOffset % 60000) // 1000
# Set progress as a single percentage value
item_data["progress"] = round(progress_pct, 1)
# Add time info as separate fields
item_data["current_time"] = f"{current_mins}:{current_secs:02d}"
item_data["total_time"] = f"{total_mins}:{total_secs:02d}"
result["items"].append(item_data)
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting on-deck items: {str(e)}"})
@mcp.tool()
async def user_get_watch_history(username: str = PLEX_USERNAME, limit: int = 10, content_type: str = None) -> str:
"""Get recent watch history for a specific user.
Args:
username: Name of the user to get watch history for
limit: Maximum number of recently watched items to show
content_type: Optional filter for content type (movie, show, episode, etc)
"""
try:
plex = connect_to_plex()
account = plex.myPlexAccount()
# Track items we've already seen to avoid duplicates when expanding search
seen_item_ids = set()
filtered_items = []
current_search_limit = limit * 2 # Start with 2x the requested limit
max_attempts = 4 # Maximum number of search expansions to prevent infinite loops
attempt = 0
while len(filtered_items) < limit and attempt < max_attempts:
attempt += 1
# For the main account owner
if username.lower() == account.username.lower():
history_items = plex.history(maxresults=current_search_limit)
else:
# For a different user, find them in shared users
target_user = None
for user in account.users():
if user.username.lower() == username.lower() or user.title.lower() == username.lower():
target_user = user
break
if not target_user:
return json.dumps({"error": f"User '{username}' not found."})
# For a shared user, use accountID to filter history
history_items = plex.history(maxresults=current_search_limit, accountID=target_user.id)
# Filter by content type and deduplicate
for item in history_items:
item_id = getattr(item, 'ratingKey', None)
# Skip if we've already processed this item
if item_id and item_id in seen_item_ids:
continue
# Add to seen items
if item_id:
seen_item_ids.add(item_id)
# Apply content type filter if specified
item_type = getattr(item, 'type', 'unknown')
if content_type and item_type.lower() != content_type.lower():
continue
filtered_items.append(item)
# Stop if we've reached the limit
if len(filtered_items) >= limit:
break
# If we still need more items, double the search limit for next attempt
if len(filtered_items) < limit and history_items:
current_search_limit *= 2
else:
# Either we have enough items or there are no more to fetch
break
# If we couldn't find any matching items
if not filtered_items:
message = f"No watch history found for user '{username}'"
if content_type:
message += f" with content type '{content_type}'"
return json.dumps({"message": message})
# Format the results
result = {
"username": username,
"count": len(filtered_items),
"requestedLimit": limit,
"contentType": content_type,
"items": []
}
# Add only the requested limit number of items
for item in filtered_items[:limit]:
media_type = getattr(item, 'type', 'unknown')
title = getattr(item, 'title', 'Unknown Title')
item_data = {
"type": media_type,
"title": title,
"ratingKey": getattr(item, 'ratingKey', None)
}
# Format based on media type
if media_type == 'episode':
item_data["show"] = getattr(item, 'grandparentTitle', 'Unknown Show')
item_data["season"] = getattr(item, 'parentTitle', 'Unknown Season')
item_data["episodeNumber"] = getattr(item, 'index', None)
item_data["seasonNumber"] = getattr(item, 'parentIndex', None)
else:
item_data["year"] = getattr(item, 'year', '')
# Add viewed date if available
if hasattr(item, 'viewedAt') and item.viewedAt:
item_data["viewedAt"] = item.viewedAt.strftime("%Y-%m-%d %H:%M")
result["items"].append(item_data)
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting watch history: {str(e)}"})
@mcp.tool()
async def user_get_statistics(time_period: str = "last_24_hours", username: str = None) -> str:
"""Get statistics about user watch activity over different time periods.
Args:
time_period: Time period for statistics - options: "last_24_hours", "last_7_days", "last_30_days", "last_90_days", "last_year", "all_time"
username: Optional. Filter statistics for a specific user. If not provided, returns statistics for all users.
"""
try:
plex = connect_to_plex()
base_url = plex._baseurl
token = plex._token
# Get the current epoch time
current_time = int(time.time())
# Map time_period to Plex API parameters
time_mapping = {
"last_24_hours": {"timespan": 4, "at": current_time - 24*60*60},
"last_7_days": {"timespan": 3, "at": current_time - 7*24*60*60},
"last_30_days": {"timespan": 2, "at": current_time - 30*24*60*60},
"last_90_days": {"timespan": 2, "at": current_time - 90*24*60*60},
"last_year": {"timespan": 1, "at": current_time - 365*24*60*60},
"all_time": {"timespan": 1, "at": 0}
}
if time_period not in time_mapping:
return json.dumps({"error": f"Invalid time period. Choose from: {', '.join(time_mapping.keys())}"})
# Build the statistics URL
params = time_mapping[time_period]
stats_url = f"{base_url}/statistics/media?timespan={params['timespan']}&at>={params['at']}"
# Add Plex headers
headers = {
'X-Plex-Token': token,
'Accept': 'application/json'
}
# Make the request to get statistics
response = requests.get(stats_url, headers=headers)
if response.status_code != 200:
return json.dumps({"error": f"Failed to fetch statistics: HTTP {response.status_code}"})
data = response.json()
# Get data from response
container = data.get('MediaContainer', {})
device_list = container.get('Device', [])
account_list = container.get('Account', [])
stats_list = container.get('StatisticsMedia', [])
# Create lookup dictionaries for accounts and devices
account_lookup: Dict[int, Dict[str, Any]] = {}
for account in account_list:
account_lookup[account.get('id')] = {
'name': account.get('name'),
'key': account.get('key'),
'thumb': account.get('thumb')
}
device_lookup: Dict[int, Dict[str, Any]] = {}
for device in device_list:
device_lookup[device.get('id')] = {
'name': device.get('name'),
'platform': device.get('platform'),
'clientIdentifier': device.get('clientIdentifier')
}
# Filter by username if specified
target_account_id = None
if username:
# Get the account ID for the specified username
account = plex.myPlexAccount()
# Check if the username matches the owner
if username.lower() == account.username.lower():
# Find the owner's account ID in the account list
for acc in account_list:
if acc.get('name').lower() == username.lower():
target_account_id = acc.get('id')
break
else:
# Check shared users
for user in account.users():
if user.username.lower() == username.lower() or (hasattr(user, 'title') and user.title.lower() == username.lower()):
# Find this user's account ID in the account list
for acc in account_list:
if acc.get('name').lower() == user.username.lower():
target_account_id = acc.get('id')
break
break
if target_account_id is None:
return json.dumps({"error": f"User '{username}' not found in the statistics data."})
# Process the statistics data
user_stats: Dict[int, Dict[str, Any]] = {}
# Media type mapping
media_type_map = {
1: "movie",
4: "episode",
10: "track",
100: "photo"
}
for stat in stats_list:
account_id = stat.get('accountID')
# Skip if we're filtering by user and this isn't the target user
if target_account_id is not None and account_id != target_account_id:
continue
device_id = stat.get('deviceID')
duration = stat.get('duration', 0) # Duration in seconds
count = stat.get('count', 0) # Number of items played
metadata_type = stat.get('metadataType', 0)
media_type = media_type_map.get(metadata_type, f"unknown-{metadata_type}")
# Initialize user stats if not already present
if account_id not in user_stats:
account_info = account_lookup.get(account_id, {'name': f"Unknown User {account_id}"})
user_stats[account_id] = {
'user': account_info.get('name'),
'user_thumb': account_info.get('thumb'),
'total_duration': 0,
'total_plays': 0,
'media_types': {},
'devices': {}
}
# Update total duration and play count
user_stats[account_id]['total_duration'] += duration
user_stats[account_id]['total_plays'] += count
# Update media type stats
if media_type not in user_stats[account_id]['media_types']:
user_stats[account_id]['media_types'][media_type] = {
'duration': 0,
'count': 0
}
user_stats[account_id]['media_types'][media_type]['duration'] += duration
user_stats[account_id]['media_types'][media_type]['count'] += count
# Update device stats
if device_id is not None:
device_info = device_lookup.get(device_id, {'name': f"Unknown Device {device_id}", 'platform': 'unknown'})
device_name = device_info.get('name')
if device_name not in user_stats[account_id]['devices']:
user_stats[account_id]['devices'][device_name] = {
'platform': device_info.get('platform'),
'duration': 0,
'count': 0
}
user_stats[account_id]['devices'][device_name]['duration'] += duration
user_stats[account_id]['devices'][device_name]['count'] += count
# Format duration for better readability in each stat entry
for account_id, stats in user_stats.items():
# Format total duration
hours, remainder = divmod(stats['total_duration'], 3600)
minutes, seconds = divmod(remainder, 60)
stats['formatted_duration'] = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
# Format media type durations
for media_type, media_stats in stats['media_types'].items():
hours, remainder = divmod(media_stats['duration'], 3600)
minutes, seconds = divmod(remainder, 60)
media_stats['formatted_duration'] = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
# Format device durations
for device_name, device_stats in stats['devices'].items():
hours, remainder = divmod(device_stats['duration'], 3600)
minutes, seconds = divmod(remainder, 60)
device_stats['formatted_duration'] = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
# Sort users by total duration (descending)
sorted_users = sorted(
user_stats.values(),
key=lambda x: x['total_duration'],
reverse=True
)
# Format the final result
result = {
"time_period": time_period,
"user_filter": username,
"total_users": len(sorted_users),
"stats_generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"users": sorted_users
}
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting user statistics: {str(e)}"})
```
--------------------------------------------------------------------------------
/modules/library.py:
--------------------------------------------------------------------------------
```python
import json
import requests
import aiohttp
import asyncio
from plexapi.exceptions import NotFound # type: ignore
from modules import mcp, connect_to_plex
from urllib.parse import urljoin
import time
def get_plex_headers(plex):
"""Get standard Plex headers for HTTP requests"""
return {
'X-Plex-Token': plex._token,
'Accept': 'application/json'
}
async def async_get_json(session, url, headers):
"""Helper function to make async HTTP requests"""
async with session.get(url, headers=headers) as response:
return await response.json()
@mcp.tool()
async def library_list() -> str:
"""List all available libraries on the Plex server."""
try:
plex = connect_to_plex()
libraries = plex.library.sections()
if not libraries:
return json.dumps({"message": "No libraries found on your Plex server."})
libraries_dict = {}
for lib in libraries:
libraries_dict[lib.title] = {
"type": lib.type,
"libraryId": lib.key,
"totalSize": lib.totalSize,
"uuid": lib.uuid,
"locations": lib.locations,
"updatedAt": lib.updatedAt.isoformat()
}
return json.dumps(libraries_dict)
except Exception as e:
return json.dumps({"error": f"Error listing libraries: {str(e)}"})
@mcp.tool()
async def library_get_stats(library_name: str) -> str:
"""Get statistics for a specific library.
Args:
library_name: Name of the library to get stats for
"""
try:
plex = connect_to_plex()
base_url = plex._baseurl
headers = get_plex_headers(plex)
async with aiohttp.ClientSession() as session:
# First get library sections
sections_url = urljoin(base_url, 'library/sections')
sections_data = await async_get_json(session, sections_url, headers)
target_section = None
for section in sections_data['MediaContainer']['Directory']:
if section['title'].lower() == library_name.lower():
target_section = section
break
if not target_section:
return json.dumps({"error": f"Library '{library_name}' not found"})
section_id = target_section['key']
library_type = target_section['type']
# Create base result
result = {
"name": target_section['title'],
"type": library_type,
"totalItems": target_section.get('totalSize', 0)
}
# Prepare URLs for concurrent requests
all_items_url = urljoin(base_url, f'library/sections/{section_id}/all')
unwatched_url = urljoin(base_url, f'library/sections/{section_id}/all?unwatched=1')
# Make concurrent requests for all and unwatched items
all_data, unwatched_data = await asyncio.gather(
async_get_json(session, all_items_url, headers),
async_get_json(session, unwatched_url, headers)
)
all_data = all_data['MediaContainer']
unwatched_data = unwatched_data['MediaContainer']
if library_type == 'movie':
movie_stats = {
"count": all_data.get('size', 0),
"unwatched": unwatched_data.get('size', 0)
}
# Get genres, directors, studios stats
genres = {}
directors = {}
studios = {}
decades = {}
for movie in all_data.get('Metadata', []):
# Process genres
for genre in movie.get('Genre', []):
genre_name = genre['tag']
genres[genre_name] = genres.get(genre_name, 0) + 1
# Process directors
for director in movie.get('Director', []):
director_name = director['tag']
directors[director_name] = directors.get(director_name, 0) + 1
# Process studios
studio = movie.get('studio')
if studio:
studios[studio] = studios.get(studio, 0) + 1
# Process decades
year = movie.get('year')
if year:
decade = (year // 10) * 10
decades[decade] = decades.get(decade, 0) + 1
# Add top items to results
if genres:
movie_stats["topGenres"] = dict(sorted(genres.items(), key=lambda x: x[1], reverse=True)[:5])
if directors:
movie_stats["topDirectors"] = dict(sorted(directors.items(), key=lambda x: x[1], reverse=True)[:5])
if studios:
movie_stats["topStudios"] = dict(sorted(studios.items(), key=lambda x: x[1], reverse=True)[:5])
if decades:
movie_stats["byDecade"] = dict(sorted(decades.items()))
result["movieStats"] = movie_stats
elif library_type == 'show':
# Prepare URLs for concurrent requests
seasons_url = urljoin(base_url, f'library/sections/{section_id}/all?type=3')
episodes_url = urljoin(base_url, f'library/sections/{section_id}/all?type=4')
# Make concurrent requests for seasons and episodes
seasons_data, episodes_data = await asyncio.gather(
async_get_json(session, seasons_url, headers),
async_get_json(session, episodes_url, headers)
)
seasons_data = seasons_data['MediaContainer']
episodes_data = episodes_data['MediaContainer']
# Process show stats
genres = {}
studios = {}
decades = {}
for show in all_data.get('Metadata', []):
# Process genres
for genre in show.get('Genre', []):
genre_name = genre['tag']
genres[genre_name] = genres.get(genre_name, 0) + 1
# Process studios
studio = show.get('studio')
if studio:
studios[studio] = studios.get(studio, 0) + 1
# Process decades
year = show.get('year')
if year:
decade = (year // 10) * 10
decades[decade] = decades.get(decade, 0) + 1
show_stats = {
"shows": all_data.get('size', 0),
"seasons": seasons_data.get('size', 0),
"episodes": episodes_data.get('size', 0),
"unwatchedShows": unwatched_data.get('size', 0)
}
# Add top items to results
if genres:
show_stats["topGenres"] = dict(sorted(genres.items(), key=lambda x: x[1], reverse=True)[:5])
if studios:
show_stats["topStudios"] = dict(sorted(studios.items(), key=lambda x: x[1], reverse=True)[:5])
if decades:
show_stats["byDecade"] = dict(sorted(decades.items()))
result["showStats"] = show_stats
elif library_type == 'artist':
# Initialize statistics
artist_stats = {
"count": all_data.get('size', 0),
"totalTracks": 0,
"totalAlbums": 0,
"totalPlays": 0
}
# Track data for statistics
all_genres = {}
all_years = {}
top_artists = {}
top_albums = {}
audio_formats = {}
# Process artists one by one for accurate stats
for artist in all_data.get('Metadata', []):
artist_id = artist.get('ratingKey')
artist_name = artist.get('title', '')
if not artist_id:
continue
# Store artist views for top artists calculation
artist_view_count = 0
artist_albums = set()
artist_track_count = 0
# Get tracks directly for this artist
artist_tracks_url = urljoin(base_url, f'library/sections/{section_id}/all?artist.id={artist_id}&type=10')
artist_tracks_data = await async_get_json(session, artist_tracks_url, headers)
if 'MediaContainer' in artist_tracks_data and 'Metadata' in artist_tracks_data['MediaContainer']:
for track in artist_tracks_data['MediaContainer']['Metadata']:
# Count total tracks
artist_track_count += 1
# Count track views for this artist
track_views = track.get('viewCount', 0)
artist_view_count += track_views
artist_stats["totalPlays"] += track_views
# Add album to set
album_title = track.get('parentTitle')
if album_title:
artist_albums.add(album_title)
# Track album plays for top albums
album_key = f"{artist_name} - {album_title}"
if album_key not in top_albums:
top_albums[album_key] = 0
top_albums[album_key] += track_views
# Process genres if available
if 'Genre' in track:
for genre in track.get('Genre', []):
genre_name = genre['tag']
all_genres[genre_name] = all_genres.get(genre_name, 0) + 1
# Process years instead of decades
year = track.get('parentYear') or track.get('year')
if year:
all_years[year] = all_years.get(year, 0) + 1
# Track audio formats
if 'Media' in track and track['Media'] and 'audioCodec' in track['Media'][0]:
audio_codec = track['Media'][0]['audioCodec']
audio_formats[audio_codec] = audio_formats.get(audio_codec, 0) + 1
# Update top artists
if artist_track_count > 0:
top_artists[artist_name] = artist_view_count
# Update totals
artist_stats["totalTracks"] += artist_track_count
artist_stats["totalAlbums"] += len(artist_albums)
# Add top items to results
if all_genres:
artist_stats["topGenres"] = dict(sorted(all_genres.items(), key=lambda x: x[1], reverse=True)[:10])
if top_artists:
artist_stats["topArtists"] = dict(sorted(top_artists.items(), key=lambda x: x[1], reverse=True)[:10])
if top_albums:
artist_stats["topAlbums"] = dict(sorted(top_albums.items(), key=lambda x: x[1], reverse=True)[:10])
if all_years:
artist_stats["byYear"] = dict(sorted(all_years.items()))
if audio_formats:
artist_stats["audioFormats"] = audio_formats
result["musicStats"] = artist_stats
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting library stats: {str(e)}"})
@mcp.tool()
async def library_refresh(library_name: str = None) -> str:
"""Refresh a specific library or all libraries.
Args:
library_name: Optional name of the library to refresh (refreshes all if None)
"""
try:
plex = connect_to_plex()
if library_name:
# Refresh a specific library
section = None
all_sections = plex.library.sections()
# Find the section with matching name (case-insensitive)
for s in all_sections:
if s.title.lower() == library_name.lower():
section = s
break
if not section:
return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
# Refresh the library
section.refresh()
return json.dumps({"success": True, "message": f"Refreshing library '{section.title}'. This may take some time."})
else:
# Refresh all libraries
plex.library.refresh()
return json.dumps({"success": True, "message": "Refreshing all libraries. This may take some time."})
except Exception as e:
return json.dumps({"error": f"Error refreshing library: {str(e)}"})
@mcp.tool()
async def library_scan(library_name: str, path: str = None) -> str:
"""Scan a specific library or part of a library.
Args:
library_name: Name of the library to scan
path: Optional specific path to scan within the library
"""
try:
plex = connect_to_plex()
# Find the specified library
section = None
all_sections = plex.library.sections()
# Find the section with matching name (case-insensitive)
for s in all_sections:
if s.title.lower() == library_name.lower():
section = s
break
if not section:
return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
# Scan the library
if path:
try:
section.update(path=path)
return json.dumps({"success": True, "message": f"Scanning path '{path}' in library '{section.title}'. This may take some time."})
except NotFound:
return json.dumps({"error": f"Path '{path}' not found in library '{section.title}'."})
else:
section.update()
return json.dumps({"success": True, "message": f"Scanning library '{section.title}'. This may take some time."})
except Exception as e:
return json.dumps({"error": f"Error scanning library: {str(e)}"})
@mcp.tool()
async def library_get_details(library_name: str) -> str:
"""Get detailed information about a specific library, including folder paths and settings.
Args:
library_name: Name of the library to get details for
"""
try:
plex = connect_to_plex()
# Get all library sections
all_sections = plex.library.sections()
target_section = None
# Find the section with the matching name (case-insensitive)
for section in all_sections:
if section.title.lower() == library_name.lower():
target_section = section
break
if not target_section:
return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
# Create the result dictionary
result = {
"name": target_section.title,
"type": target_section.type,
"uuid": target_section.uuid,
"totalItems": target_section.totalSize,
"locations": target_section.locations,
"agent": target_section.agent,
"scanner": target_section.scanner,
"language": target_section.language
}
# Get additional attributes using _data
data = target_section._data
# Add scanner settings if available
if 'scannerSettings' in data:
scanner_settings = {}
for setting in data['scannerSettings']:
if 'value' in setting:
scanner_settings[setting.get('key', 'unknown')] = setting['value']
if scanner_settings:
result["scannerSettings"] = scanner_settings
# Add agent settings if available
if 'agentSettings' in data:
agent_settings = {}
for setting in data['agentSettings']:
if 'value' in setting:
agent_settings[setting.get('key', 'unknown')] = setting['value']
if agent_settings:
result["agentSettings"] = agent_settings
# Add advanced settings if available
if 'advancedSettings' in data:
advanced_settings = {}
for setting in data['advancedSettings']:
if 'value' in setting:
advanced_settings[setting.get('key', 'unknown')] = setting['value']
if advanced_settings:
result["advancedSettings"] = advanced_settings
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting library details: {str(e)}"})
@mcp.tool()
async def library_get_recently_added(count: int = 50, library_name: str = None) -> str:
"""Get recently added media across all libraries or in a specific library.
Args:
count: Number of items to return (default: 50)
library_name: Optional library name to limit results to
"""
try:
plex = connect_to_plex()
# Check if we need to filter by library
if library_name:
# Find the specified library
section = None
all_sections = plex.library.sections()
# Find the section with matching name (case-insensitive)
for s in all_sections:
if s.title.lower() == library_name.lower():
section = s
break
if not section:
return json.dumps({"error": f"Library '{library_name}' not found. Available libraries: {', '.join([s.title for s in all_sections])}"})
# Get recently added from this library
recent = section.recentlyAdded(maxresults=count)
else:
# Get recently added across all libraries
recent = plex.library.recentlyAdded()
# Sort by date added (newest first) and limit to count
if recent:
try:
recent = sorted(recent, key=lambda x: getattr(x, 'addedAt', None), reverse=True)[:count]
except Exception as sort_error:
# If sorting fails, just take the first 'count' items
recent = recent[:count]
if not recent:
return json.dumps({"message": "No recently added items found."})
# Prepare the result
result = {
"count": len(recent),
"requestedCount": count,
"library": library_name if library_name else "All Libraries",
"items": {}
}
# Group results by type
for item in recent:
item_type = getattr(item, 'type', 'unknown')
if item_type not in result["items"]:
result["items"][item_type] = []
try:
added_at = str(getattr(item, 'addedAt', 'Unknown date'))
if item_type == 'movie':
result["items"][item_type].append({
"title": item.title,
"year": getattr(item, 'year', ''),
"addedAt": added_at
})
elif item_type == 'show':
result["items"][item_type].append({
"title": item.title,
"year": getattr(item, 'year', ''),
"addedAt": added_at
})
elif item_type == 'season':
result["items"][item_type].append({
"showTitle": getattr(item, 'parentTitle', 'Unknown Show'),
"seasonNumber": getattr(item, 'index', '?'),
"addedAt": added_at
})
elif item_type == 'episode':
result["items"][item_type].append({
"showTitle": getattr(item, 'grandparentTitle', 'Unknown Show'),
"seasonNumber": getattr(item, 'parentIndex', '?'),
"episodeNumber": getattr(item, 'index', '?'),
"title": item.title,
"addedAt": added_at
})
elif item_type == 'artist':
result["items"][item_type].append({
"title": item.title,
"addedAt": added_at
})
elif item_type == 'album':
result["items"][item_type].append({
"artist": getattr(item, 'parentTitle', 'Unknown Artist'),
"title": item.title,
"addedAt": added_at
})
elif item_type == 'track':
result["items"][item_type].append({
"artist": getattr(item, 'grandparentTitle', 'Unknown Artist'),
"album": getattr(item, 'parentTitle', 'Unknown Album'),
"title": item.title,
"addedAt": added_at
})
else:
# Generic handler for other types
result["items"][item_type].append({
"title": getattr(item, 'title', 'Unknown'),
"addedAt": added_at
})
except Exception as format_error:
# If there's an error formatting a particular item, just output basic info
result["items"][item_type].append({
"title": getattr(item, 'title', 'Unknown'),
"error": str(format_error)
})
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting recently added items: {str(e)}"})
@mcp.tool()
async def library_get_contents(library_name: str) -> str:
"""Get the full contents of a specific library.
Args:
library_name: Name of the library to get contents from
Returns:
String listing all items in the library
"""
try:
plex = connect_to_plex()
base_url = plex._baseurl
headers = get_plex_headers(plex)
async with aiohttp.ClientSession() as session:
# First get library sections
sections_url = urljoin(base_url, 'library/sections')
sections_data = await async_get_json(session, sections_url, headers)
target_section = None
for section in sections_data['MediaContainer']['Directory']:
if section['title'].lower() == library_name.lower():
target_section = section
break
if not target_section:
return json.dumps({"error": f"Library '{library_name}' not found"})
section_id = target_section['key']
library_type = target_section['type']
# Get all items
all_items_url = urljoin(base_url, f'library/sections/{section_id}/all')
all_data = await async_get_json(session, all_items_url, headers)
all_data = all_data['MediaContainer']
# Prepare the result
result = {
"name": target_section['title'],
"type": library_type,
"totalItems": all_data.get('size', 0),
"items": []
}
# Process items based on library type
if library_type == 'movie':
for item in all_data.get('Metadata', []):
year = item.get('year', 'Unknown')
duration = item.get('duration', 0)
# Convert duration from milliseconds to hours and minutes
hours, remainder = divmod(duration // 1000, 3600)
minutes, seconds = divmod(remainder, 60)
# Get media info
media_info = {}
if 'Media' in item:
media = item['Media'][0] if item['Media'] else {}
resolution = media.get('videoResolution', '')
codec = media.get('videoCodec', '')
if resolution and codec:
media_info = {
"resolution": resolution,
"codec": codec
}
# Check if watched
watched = item.get('viewCount', 0) > 0
result["items"].append({
"title": item.get('title', ''),
"year": year,
"duration": {
"hours": hours,
"minutes": minutes
},
"mediaInfo": media_info,
"watched": watched
})
elif library_type == 'show':
# Get all shows metadata in parallel
show_urls = [
(item["ratingKey"], urljoin(base_url, f'library/metadata/{item["ratingKey"]}'))
for item in all_data.get('Metadata', [])
]
show_responses = await asyncio.gather(
*[async_get_json(session, url, headers) for _, url in show_urls]
)
for item, show_data in zip(all_data.get('Metadata', []), show_responses):
show_data = show_data['MediaContainer']['Metadata'][0]
year = item.get('year', 'Unknown')
season_count = show_data.get('childCount', 0)
episode_count = show_data.get('leafCount', 0)
watched = episode_count > 0 and show_data.get('viewedLeafCount', 0) == episode_count
result["items"].append({
"title": item.get('title', ''),
"year": year,
"seasonCount": season_count,
"episodeCount": episode_count,
"watched": watched
})
elif library_type == 'artist':
# Process artists one by one for more accurate track/album counting
artists_info = {}
for artist in all_data.get('Metadata', []):
artist_id = artist.get('ratingKey')
artist_name = artist.get('title', '')
if not artist_id:
continue
# Store the original artist viewCount and skipCount as fallback
orig_view_count = artist.get('viewCount', 0)
orig_skip_count = artist.get('skipCount', 0)
# Get tracks directly for this artist
artist_tracks_url = urljoin(base_url, f'library/sections/{section_id}/all?artist.id={artist_id}&type=10')
artist_tracks_data = await async_get_json(session, artist_tracks_url, headers)
# Initialize artist data
if artist_name not in artists_info:
artists_info[artist_name] = {
"title": artist_name,
"albums": set(),
"trackCount": 0,
"viewCount": 0,
"skipCount": 0
}
# Count tracks and albums from the track-level data
track_view_count = 0
track_skip_count = 0
if 'MediaContainer' in artist_tracks_data and 'Metadata' in artist_tracks_data['MediaContainer']:
for track in artist_tracks_data['MediaContainer']['Metadata']:
# Count each track
artists_info[artist_name]["trackCount"] += 1
# Add album to set (to get unique album count)
if 'parentTitle' in track and track['parentTitle']:
artists_info[artist_name]["albums"].add(track['parentTitle'])
# Count views and skips
track_view_count += track.get('viewCount', 0)
track_skip_count += track.get('skipCount', 0)
# Use the sum of track counts if they're non-zero, otherwise fall back to artist level counts
artists_info[artist_name]["viewCount"] = track_view_count if track_view_count > 0 else orig_view_count
artists_info[artist_name]["skipCount"] = track_skip_count if track_skip_count > 0 else orig_skip_count
# Convert album sets to counts and add to results
for artist_name, info in artists_info.items():
result["items"].append({
"title": info["title"],
"albumCount": len(info["albums"]),
"trackCount": info["trackCount"],
"viewCount": info["viewCount"],
"skipCount": info["skipCount"]
})
else:
# Generic handler for other types
for item in all_data.get('Metadata', []):
result["items"].append({
"title": item.get('title', '')
})
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Error getting library contents: {str(e)}"})
```
--------------------------------------------------------------------------------
/modules/collection.py:
--------------------------------------------------------------------------------
```python
from plexapi.collection import Collection # type: ignore
from typing import List, Dict, Any
from modules import mcp, connect_to_plex
import os
from plexapi.exceptions import NotFound, BadRequest # type: ignore
import json
@mcp.tool()
async def collection_list(library_name: str = None) -> str:
"""List all collections on the Plex server or in a specific library.
Args:
library_name: Optional name of the library to list collections from
"""
try:
plex = connect_to_plex()
collections_data = []
# If library_name is provided, only show collections from that library
if library_name:
try:
library = plex.library.section(library_name)
collections = library.collections()
for collection in collections:
collection_info = {
"title": collection.title,
"summary": collection.summary,
"is_smart": collection.smart,
"ID": collection.ratingKey,
"items": collection.childCount
}
collections_data.append(collection_info)
return json.dumps(collections_data, indent=4)
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# No library specified, get collections from all movie and show libraries
movie_libraries = []
show_libraries = []
for section in plex.library.sections():
if section.type == 'movie':
movie_libraries.append(section)
elif section.type == 'show':
show_libraries.append(section)
# Group collections by library
libraries_collections = {}
# Get movie collections
for library in movie_libraries:
lib_collections = []
for collection in library.collections():
collection_info = {
"title": collection.title,
"summary": collection.summary,
"is_smart": collection.smart,
"ID": collection.ratingKey,
"items": collection.childCount
}
lib_collections.append(collection_info)
libraries_collections[library.title] = {
"type": "movie",
"collections_count": len(lib_collections),
"collections": lib_collections
}
# Get TV show collections
for library in show_libraries:
lib_collections = []
for collection in library.collections():
collection_info = {
"title": collection.title,
"summary": collection.summary,
"is_smart": collection.smart,
"ID": collection.ratingKey,
"items": collection.childCount
}
lib_collections.append(collection_info)
libraries_collections[library.title] = {
"type": "show",
"collections_count": len(lib_collections),
"collections": lib_collections
}
return json.dumps(libraries_collections, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def collection_create(collection_title: str, library_name: str, item_titles: List[str] = None, item_ids: List[int] = None) -> str:
"""Create a new collection with specified items.
Args:
collection_title: Title for the new collection
library_name: Name of the library to create the collection in
item_titles: List of media titles to include in the collection (optional if item_ids is provided)
item_ids: List of media IDs to include in the collection (optional if item_titles is provided)
"""
try:
plex = connect_to_plex()
# Validate that at least one item source is provided
if (not item_titles or len(item_titles) == 0) and (not item_ids or len(item_ids) == 0):
return json.dumps({"error": "Either item_titles or item_ids must be provided"}, indent=4)
# Find the library
try:
library = plex.library.section(library_name)
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# Check if collection already exists
try:
existing_collection = next((c for c in library.collections() if c.title.lower() == collection_title.lower()), None)
if existing_collection:
return json.dumps({"error": f"Collection '{collection_title}' already exists in library '{library_name}'"}, indent=4)
except Exception:
pass # If we can't check existing collections, proceed anyway
# Find items to add to the collection
items = []
not_found = []
# If we have item IDs, try to add by ID first
if item_ids and len(item_ids) > 0:
for item_id in item_ids:
try:
# Try to fetch the item by ID
item = plex.fetchItem(item_id)
if item:
items.append(item)
else:
not_found.append(str(item_id))
except Exception as e:
not_found.append(str(item_id))
# If we have item titles, search for them
if item_titles and len(item_titles) > 0:
for title in item_titles:
# Search for the media item
search_results = library.search(title=title)
if search_results:
# Check for exact title match (case insensitive)
exact_matches = [item for item in search_results if item.title.lower() == title.lower()]
if exact_matches:
items.append(exact_matches[0])
else:
# No exact match, collect possible matches
possible_matches = []
for item in search_results:
possible_matches.append({
"title": item.title,
"id": item.ratingKey,
"type": item.type,
"year": item.year if hasattr(item, 'year') and item.year else None
})
not_found.append({
"title": title,
"possible_matches": possible_matches
})
else:
not_found.append(title)
# If we have possible matches but no items to add, return the possible matches
if not items and any(isinstance(item, dict) for item in not_found):
possible_matches_response = []
for item in not_found:
if isinstance(item, dict) and "possible_matches" in item:
for match in item["possible_matches"]:
if match not in possible_matches_response:
possible_matches_response.append(match)
return json.dumps({"Multiple Possible Matches Use ID":possible_matches_response}, indent=4)
if not items:
return json.dumps({"error": "No matching media items found for the collection"}, indent=4)
# Create the collection
collection = library.createCollection(title=collection_title, items=items)
return json.dumps({
"created": True,
"title": collection.title,
"id": collection.ratingKey,
"library": library_name,
"items_added": len(items),
"items_not_found": [item for item in not_found if not isinstance(item, dict)]
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def collection_add_to(collection_title: str = None, collection_id: int = None, library_name: str = None, item_titles: List[str] = None, item_ids: List[int] = None) -> str:
"""Add items to an existing collection.
Args:
collection_title: Title of the collection to add to (optional if collection_id is provided)
collection_id: ID of the collection to add to (optional if collection_title is provided)
library_name: Name of the library containing the collection (required if using collection_title)
item_titles: List of media titles to add to the collection (optional if item_ids is provided)
item_ids: List of media IDs to add to the collection (optional if item_titles is provided)
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not collection_id and not collection_title:
return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
# Validate that at least one item source is provided
if (not item_titles or len(item_titles) == 0) and (not item_ids or len(item_ids) == 0):
return json.dumps({"error": "Either item_titles or item_ids must be provided"}, indent=4)
# Find the collection
collection = None
library = None
# If collection_id is provided, use it to directly fetch the collection
if collection_id:
try:
# Try fetching by ratingKey first
try:
collection = plex.fetchItem(collection_id)
except:
# If that fails, try finding by key in all libraries
collection = None
for section in plex.library.sections():
if section.type in ['movie', 'show']:
try:
for c in section.collections():
if c.ratingKey == collection_id:
collection = c
library = section
break
if collection:
break
except:
continue
if not collection:
return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
else:
# If we're searching by title
if not library_name:
return json.dumps({"error": "Library name is required when adding items by collection title"}, indent=4)
# Find the library
try:
library = plex.library.section(library_name)
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# Find matching collections
matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
if not matching_collections:
return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
# If multiple matching collections, return list of matches with IDs
if len(matching_collections) > 1:
matches = []
for c in matching_collections:
matches.append({
"title": c.title,
"id": c.ratingKey,
"library": library_name,
"item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
collection = matching_collections[0]
# Find items to add
items_to_add = []
not_found = []
already_in_collection = []
current_items = collection.items()
current_item_ids = [item.ratingKey for item in current_items]
# If we have item IDs, try to add by ID first
if item_ids and len(item_ids) > 0:
for item_id in item_ids:
try:
# Try to fetch the item by ID
item = plex.fetchItem(item_id)
if item:
if item.ratingKey in current_item_ids:
already_in_collection.append(str(item_id))
else:
items_to_add.append(item)
else:
not_found.append(str(item_id))
except Exception as e:
not_found.append(str(item_id))
# If we have item titles, search for them with exact matching
if item_titles and len(item_titles) > 0:
if not library:
# This could happen if we found the collection by ID
# Try to determine which library the collection belongs to
for section in plex.library.sections():
if section.type == 'movie' or section.type == 'show':
try:
for c in section.collections():
if c.ratingKey == collection.ratingKey:
library = section
break
if library:
break
except:
continue
if not library:
return json.dumps({"error": "Could not determine which library to search in"}, indent=4)
for title in item_titles:
# Search for the media item with exact matching
search_results = library.search(title=title)
if search_results:
# Check for exact title match (case insensitive)
exact_matches = [item for item in search_results if item.title.lower() == title.lower()]
if exact_matches:
item = exact_matches[0]
if item.ratingKey in current_item_ids:
already_in_collection.append(title)
else:
items_to_add.append(item)
else:
# No exact match, collect possible matches
possible_matches = []
for item in search_results:
possible_matches.append({
"title": item.title,
"id": item.ratingKey,
"type": item.type,
"year": item.year if hasattr(item, 'year') and item.year else None
})
not_found.append({
"title": title,
"possible_matches": possible_matches
})
else:
not_found.append(title)
# If we have possible matches but no items to add, return the possible matches
if not items_to_add and any(isinstance(item, dict) for item in not_found):
possible_matches_response = []
for item in not_found:
if isinstance(item, dict) and "possible_matches" in item:
for match in item["possible_matches"]:
if match not in possible_matches_response:
possible_matches_response.append(match)
return json.dumps(possible_matches_response, indent=4)
# If no items to add and no possible matches
if not items_to_add and not already_in_collection:
return json.dumps({"error": "No matching media items found to add to the collection"}, indent=4)
# Add items to the collection
if items_to_add:
collection.addItems(items_to_add)
return json.dumps({
"added": True,
"title": collection.title,
"items_added": [item.title for item in items_to_add],
"items_already_in_collection": already_in_collection,
"items_not_found": [item for item in not_found if not isinstance(item, dict)],
"total_items": len(collection.items())
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def collection_remove_from(collection_title: str = None, collection_id: int = None, library_name: str = None, item_titles: List[str] = None) -> str:
"""Remove items from a collection.
Args:
collection_title: Title of the collection to remove from (optional if collection_id is provided)
collection_id: ID of the collection to remove from (optional if collection_title is provided)
library_name: Name of the library containing the collection (required if using collection_title)
item_titles: List of media titles to remove from the collection
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not collection_id and not collection_title:
return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
if not item_titles or len(item_titles) == 0:
return json.dumps({"error": "At least one item title must be provided to remove"}, indent=4)
# Find the collection
collection = None
# If collection_id is provided, use it to directly fetch the collection
if collection_id:
try:
# Try fetching by ratingKey first
try:
collection = plex.fetchItem(collection_id)
except:
# If that fails, try finding by key in all libraries
collection = None
for section in plex.library.sections():
if section.type in ['movie', 'show']:
try:
for c in section.collections():
if c.ratingKey == collection_id:
collection = c
break
if collection:
break
except:
continue
if not collection:
return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
else:
# If we get here, we're searching by title
if not library_name:
return json.dumps({"error": "Library name is required when removing items by collection title"}, indent=4)
# Find the library
try:
library = plex.library.section(library_name)
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# Find matching collections
matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
if not matching_collections:
return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
# If multiple matching collections, return list of matches with IDs
if len(matching_collections) > 1:
matches = []
for c in matching_collections:
matches.append({
"title": c.title,
"id": c.ratingKey,
"library": library_name,
"item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
collection = matching_collections[0]
# Get current items in the collection
collection_items = collection.items()
# Find items to remove
items_to_remove = []
not_found = []
for title in item_titles:
found = False
for item in collection_items:
if item.title.lower() == title.lower():
items_to_remove.append(item)
found = True
break
if not found:
not_found.append(title)
if not items_to_remove:
# No items found to remove, return the current collection contents
current_items = []
for item in collection_items:
current_items.append({
"title": item.title,
"type": item.type,
"id": item.ratingKey
})
return json.dumps({
"error": "No matching items found in the collection to remove",
"collection_title": collection.title,
"collection_id": collection.ratingKey,
"current_items": current_items
}, indent=4)
# Remove items from the collection
collection.removeItems(items_to_remove)
return json.dumps({
"removed": True,
"title": collection.title,
"items_removed": [item.title for item in items_to_remove],
"items_not_found": not_found,
"remaining_items": len(collection.items())
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def collection_delete(collection_title: str = None, collection_id: int = None, library_name: str = None) -> str:
"""Delete a collection.
Args:
collection_title: Title of the collection to delete (optional if collection_id is provided)
collection_id: ID of the collection to delete (optional if collection_title is provided)
library_name: Name of the library containing the collection (required if using collection_title)
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not collection_id and not collection_title:
return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
# If collection_id is provided, use it to directly fetch the collection
if collection_id:
try:
# Try fetching by ratingKey first
try:
collection = plex.fetchItem(collection_id)
except:
# If that fails, try finding by key in all libraries
collection = None
for section in plex.library.sections():
if section.type in ['movie', 'show']:
try:
for c in section.collections():
if c.ratingKey == collection_id:
collection = c
break
if collection:
break
except:
continue
if not collection:
return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
# Get the collection title to return in the message
collection_title_to_return = collection.title
# Delete the collection
collection.delete()
# Return a simple object with the result
return json.dumps({
"deleted": True,
"title": collection_title_to_return
}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
# If we get here, we're searching by title
if not library_name:
return json.dumps({"error": "Library name is required when deleting by collection title"}, indent=4)
# Find the library
try:
library = plex.library.section(library_name)
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# Find matching collections
matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
if not matching_collections:
return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
# If multiple matching collections, return list of matches with IDs
if len(matching_collections) > 1:
matches = []
for c in matching_collections:
matches.append({
"title": c.title,
"id": c.ratingKey,
"library": library_name,
"item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
collection = matching_collections[0]
collection_title_to_return = collection.title
# Delete the collection
collection.delete()
# Return a simple object with the result
return json.dumps({
"deleted": True,
"title": collection_title_to_return
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def collection_edit(collection_title: str = None, collection_id: int = None, library_name: str = None,
new_title: str = None, new_sort_title: str = None,
new_summary: str = None, new_content_rating: str = None,
new_labels: List[str] = None, add_labels: List[str] = None,
remove_labels: List[str] = None,
poster_path: str = None, poster_url: str = None,
background_path: str = None, background_url: str = None,
new_advanced_settings: Dict[str, Any] = None) -> str:
"""Comprehensively edit a collection's attributes.
Args:
collection_title: Title of the collection to edit (optional if collection_id is provided)
collection_id: ID of the collection to edit (optional if collection_title is provided)
library_name: Name of the library containing the collection (required if using collection_title)
new_title: New title for the collection
new_sort_title: New sort title for the collection
new_summary: New summary/description for the collection
new_content_rating: New content rating (e.g., PG-13, R, etc.)
new_labels: Set completely new labels (replaces existing)
add_labels: Labels to add to existing ones
remove_labels: Labels to remove from existing ones
poster_path: Path to a new poster image file
poster_url: URL to a new poster image
background_path: Path to a new background/art image file
background_url: URL to a new background/art image
new_advanced_settings: Dictionary of advanced settings to apply
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not collection_id and not collection_title:
return json.dumps({"error": "Either collection_id or collection_title must be provided"}, indent=4)
# Find the collection
collection = None
# If collection_id is provided, use it to directly fetch the collection
if collection_id:
try:
# Try fetching by ratingKey first
try:
collection = plex.fetchItem(collection_id)
except:
# If that fails, try finding by key in all libraries
collection = None
for section in plex.library.sections():
if section.type in ['movie', 'show']:
try:
for c in section.collections():
if c.ratingKey == collection_id:
collection = c
break
if collection:
break
except:
continue
if not collection:
return json.dumps({"error": f"Collection with ID '{collection_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching collection by ID: {str(e)}"}, indent=4)
else:
# If we get here, we're searching by title
if not library_name:
return json.dumps({"error": "Library name is required when editing by collection title"}, indent=4)
# Find the library
try:
library = plex.library.section(library_name)
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# Find matching collections
matching_collections = [c for c in library.collections() if c.title.lower() == collection_title.lower()]
if not matching_collections:
return json.dumps({"error": f"Collection '{collection_title}' not found in library '{library_name}'"}, indent=4)
# If multiple matching collections, return list of matches with IDs
if len(matching_collections) > 1:
matches = []
for c in matching_collections:
matches.append({
"title": c.title,
"id": c.ratingKey,
"library": library_name,
"item_count": c.childCount if hasattr(c, 'childCount') else len(c.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
collection = matching_collections[0]
# Track changes
changes = []
# Edit basic attributes
edit_params = {}
if new_title is not None and new_title != collection.title:
edit_params['title'] = new_title
changes.append(f"title to '{new_title}'")
if new_sort_title is not None:
current_sort = getattr(collection, 'titleSort', '')
if new_sort_title != current_sort:
edit_params['titleSort'] = new_sort_title
changes.append(f"sort title to '{new_sort_title}'")
if new_summary is not None:
current_summary = getattr(collection, 'summary', '')
if new_summary != current_summary:
edit_params['summary'] = new_summary
changes.append("summary")
if new_content_rating is not None:
current_rating = getattr(collection, 'contentRating', '')
if new_content_rating != current_rating:
edit_params['contentRating'] = new_content_rating
changes.append(f"content rating to '{new_content_rating}'")
# Apply the basic edits if any parameters were set
if edit_params:
collection.edit(**edit_params)
# Handle labels
current_labels = getattr(collection, 'labels', [])
if new_labels is not None:
# Replace all labels
collection.removeLabel(current_labels)
if new_labels:
collection.addLabel(new_labels)
changes.append("labels completely replaced")
else:
# Handle adding and removing individual labels
if add_labels:
for label in add_labels:
if label not in current_labels:
collection.addLabel(label)
changes.append(f"added labels: {', '.join(add_labels)}")
if remove_labels:
for label in remove_labels:
if label in current_labels:
collection.removeLabel(label)
changes.append(f"removed labels: {', '.join(remove_labels)}")
# Handle artwork
if poster_path:
collection.uploadPoster(filepath=poster_path)
changes.append("poster (from file)")
elif poster_url:
collection.uploadPoster(url=poster_url)
changes.append("poster (from URL)")
if background_path:
collection.uploadArt(filepath=background_path)
changes.append("background art (from file)")
elif background_url:
collection.uploadArt(url=background_url)
changes.append("background art (from URL)")
# Handle advanced settings
if new_advanced_settings:
for key, value in new_advanced_settings.items():
try:
setattr(collection, key, value)
changes.append(f"advanced setting '{key}'")
except Exception as setting_error:
return json.dumps({
"error": f"Error setting advanced parameter '{key}': {str(setting_error)}"
}, indent=4)
if not changes:
return json.dumps({"updated": False, "message": "No changes made to the collection"}, indent=4)
# Get the collection title for the response (use new_title if it was changed)
collection_title_to_return = new_title if new_title else collection.title
return json.dumps({
"updated": True,
"title": collection_title_to_return,
"changes": changes
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
```
--------------------------------------------------------------------------------
/modules/client.py:
--------------------------------------------------------------------------------
```python
"""
Client-related functions for Plex Media Server.
Provides tools to connect to clients and control media playback.
"""
import json
import time
from typing import List, Dict, Optional, Union, Any
from modules import mcp, connect_to_plex
from plexapi.exceptions import NotFound, Unauthorized
@mcp.tool()
async def client_list(include_details: bool = True) -> str:
"""List all available Plex clients connected to the server.
Args:
include_details: Whether to include detailed information about each client
Returns:
List of client names or detailed info dictionaries
"""
try:
plex = connect_to_plex()
clients = plex.clients()
# Also get session clients which may not appear in clients()
sessions = plex.sessions()
session_clients = []
# Extract clients from sessions
for session in sessions:
if hasattr(session, 'player') and session.player:
session_clients.append(session.player)
# Combine both client lists, avoiding duplicates
all_clients = clients.copy()
client_ids = {client.machineIdentifier for client in clients}
for client in session_clients:
if hasattr(client, 'machineIdentifier') and client.machineIdentifier not in client_ids:
all_clients.append(client)
client_ids.add(client.machineIdentifier)
if not all_clients:
return json.dumps({
"status": "success",
"message": "No clients currently connected to your Plex server.",
"count": 0,
"clients": []
})
result = []
if include_details:
for client in all_clients:
result.append({
"name": client.title,
"device": getattr(client, 'device', 'Unknown'),
"model": getattr(client, "model", "Unknown"),
"product": getattr(client, 'product', 'Unknown'),
"version": getattr(client, 'version', 'Unknown'),
"platform": getattr(client, "platform", "Unknown"),
"state": getattr(client, "state", "Unknown"),
"machineIdentifier": getattr(client, 'machineIdentifier', 'Unknown'),
"address": getattr(client, "_baseurl", "Unknown") or getattr(client, "address", "Unknown"),
"protocolCapabilities": getattr(client, "protocolCapabilities", [])
})
else:
result = [client.title for client in all_clients]
return json.dumps({
"status": "success",
"message": f"Found {len(all_clients)} connected clients",
"count": len(all_clients),
"clients": result
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error listing clients: {str(e)}"
})
@mcp.tool()
async def client_get_details(client_name: str) -> str:
"""Get detailed information about a specific Plex client.
Args:
client_name: Name of the client to get details for
Returns:
Dictionary containing client details
"""
try:
plex = connect_to_plex()
# Get regular clients
regular_clients = plex.clients()
# Also get clients from sessions
sessions = plex.sessions()
session_clients = []
# Extract clients from sessions
for session in sessions:
if hasattr(session, 'player') and session.player:
session_clients.append(session.player)
# Try to find the client first in regular clients
client = None
try:
client = plex.client(client_name)
except NotFound:
# Try to find a client with a matching name in regular clients
matching_clients = [c for c in regular_clients if client_name.lower() in c.title.lower()]
if matching_clients:
client = matching_clients[0]
else:
# Try to find in session clients
matching_session_clients = [c for c in session_clients if
hasattr(c, 'title') and client_name.lower() in c.title.lower()]
if matching_session_clients:
client = matching_session_clients[0]
else:
return json.dumps({
"status": "error",
"message": f"No client found matching '{client_name}'"
})
client_details = {
"name": client.title,
"device": getattr(client, 'device', 'Unknown'),
"deviceClass": getattr(client, "deviceClass", "Unknown"),
"model": getattr(client, "model", "Unknown"),
"product": getattr(client, 'product', 'Unknown'),
"version": getattr(client, 'version', 'Unknown'),
"platform": getattr(client, "platform", "Unknown"),
"platformVersion": getattr(client, "platformVersion", "Unknown"),
"state": getattr(client, "state", "Unknown"),
"machineIdentifier": getattr(client, 'machineIdentifier', 'Unknown'),
"protocolCapabilities": getattr(client, "protocolCapabilities", []),
"address": getattr(client, "_baseurl", "Unknown") or getattr(client, "address", "Unknown"),
"local": getattr(client, "local", "Unknown"),
"protocol": getattr(client, "protocol", "plex"),
"protocolVersion": getattr(client, "protocolVersion", "Unknown"),
"vendor": getattr(client, "vendor", "Unknown"),
}
return json.dumps({
"status": "success",
"client": client_details
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error getting client details: {str(e)}"
})
@mcp.tool()
async def client_get_timelines(client_name: str) -> str:
"""Get the current timeline information for a specific Plex client.
Args:
client_name: Name of the client to get timeline for
Returns:
Timeline information for the client
"""
try:
plex = connect_to_plex()
# Get regular clients
regular_clients = plex.clients()
# Also get clients from sessions
sessions = plex.sessions()
session_clients = []
# Extract clients from sessions
for session in sessions:
if hasattr(session, 'player') and session.player:
session_clients.append(session.player)
# Try to find the client first in regular clients
client = None
try:
client = plex.client(client_name)
except NotFound:
# Try to find a client with a matching name in regular clients
matching_clients = [c for c in regular_clients if client_name.lower() in c.title.lower()]
if matching_clients:
client = matching_clients[0]
else:
# Try to find in session clients
matching_session_clients = [c for c in session_clients if
hasattr(c, 'title') and client_name.lower() in c.title.lower()]
if matching_session_clients:
client = matching_session_clients[0]
else:
return json.dumps({
"status": "error",
"message": f"No client found matching '{client_name}'"
})
# Some clients may not always respond to timeline requests
try:
timeline = client.timeline
# If timeline is None, the client might not be actively playing anything
if timeline is None:
# Check if this client has an active session
for session in sessions:
if (hasattr(session, 'player') and session.player and
hasattr(session.player, 'machineIdentifier') and
hasattr(client, 'machineIdentifier') and
session.player.machineIdentifier == client.machineIdentifier):
# Use session information instead
session_data = {
"state": session.player.state if hasattr(session.player, 'state') else "Unknown",
"time": session.viewOffset if hasattr(session, 'viewOffset') else 0,
"duration": session.duration if hasattr(session, 'duration') else 0,
"progress": round((session.viewOffset / session.duration * 100) if hasattr(session, 'viewOffset') and
hasattr(session, 'duration') and session.duration else 0, 2),
"title": session.title if hasattr(session, 'title') else "Unknown",
"type": session.type if hasattr(session, 'type') else "Unknown",
}
return json.dumps({
"status": "success",
"client_name": client.title,
"source": "session",
"timeline": session_data
}, indent=2)
return json.dumps({
"status": "info",
"message": f"Client '{client.title}' is not currently playing any media.",
"client_name": client.title
})
# Process timeline data
timeline_data = {
"type": timeline.type,
"state": timeline.state,
"time": timeline.time,
"duration": timeline.duration,
"progress": round((timeline.time / timeline.duration * 100) if timeline.duration else 0, 2),
"key": getattr(timeline, "key", None),
"ratingKey": getattr(timeline, "ratingKey", None),
"playQueueItemID": getattr(timeline, "playQueueItemID", None),
"playbackRate": getattr(timeline, "playbackRate", 1),
"shuffled": getattr(timeline, "shuffled", False),
"repeated": getattr(timeline, "repeated", 0),
"muted": getattr(timeline, "muted", False),
"volume": getattr(timeline, "volume", None),
"title": getattr(timeline, "title", None),
"guid": getattr(timeline, "guid", None),
}
return json.dumps({
"status": "success",
"client_name": client.title,
"source": "timeline",
"timeline": timeline_data
}, indent=2)
except:
# Check if there's an active session for this client
for session in sessions:
if (hasattr(session, 'player') and session.player and
hasattr(session.player, 'machineIdentifier') and
hasattr(client, 'machineIdentifier') and
session.player.machineIdentifier == client.machineIdentifier):
# Use session information instead
session_data = {
"state": session.player.state if hasattr(session.player, 'state') else "Unknown",
"time": session.viewOffset if hasattr(session, 'viewOffset') else 0,
"duration": session.duration if hasattr(session, 'duration') else 0,
"progress": round((session.viewOffset / session.duration * 100) if hasattr(session, 'viewOffset') and
hasattr(session, 'duration') and session.duration else 0, 2),
"title": session.title if hasattr(session, 'title') else "Unknown",
"type": session.type if hasattr(session, 'type') else "Unknown",
}
return json.dumps({
"status": "success",
"client_name": client.title,
"source": "session",
"timeline": session_data
}, indent=2)
return json.dumps({
"status": "warning",
"message": f"Unable to get timeline information for client '{client.title}'. The client may not be responding to timeline requests.",
"client_name": client.title
})
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error getting client timeline: {str(e)}"
})
@mcp.tool()
async def client_get_active() -> str:
"""Get all clients that are currently playing media.
Returns:
List of active clients with their playback status
"""
try:
plex = connect_to_plex()
# Get all sessions
sessions = plex.sessions()
if not sessions:
return json.dumps({
"status": "success",
"message": "No active playback sessions found.",
"count": 0,
"active_clients": []
})
active_clients = []
for session in sessions:
if hasattr(session, 'player') and session.player:
player = session.player
# Get media information
media_info = {
"title": session.title if hasattr(session, 'title') else "Unknown",
"type": session.type if hasattr(session, 'type') else "Unknown",
}
# Add additional info based on media type
if hasattr(session, 'type'):
if session.type == 'episode':
media_info["show"] = getattr(session, 'grandparentTitle', 'Unknown Show')
media_info["season"] = getattr(session, 'parentTitle', 'Unknown Season')
media_info["seasonEpisode"] = f"S{getattr(session, 'parentIndex', '?')}E{getattr(session, 'index', '?')}"
elif session.type == 'movie':
media_info["year"] = getattr(session, 'year', 'Unknown')
# Calculate progress if possible
progress = None
if hasattr(session, 'viewOffset') and hasattr(session, 'duration') and session.duration:
progress = round((session.viewOffset / session.duration) * 100, 1)
# Get user info
username = "Unknown User"
if hasattr(session, 'usernames') and session.usernames:
username = session.usernames[0]
# Get transcoding status
transcoding = False
if hasattr(session, 'transcodeSessions') and session.transcodeSessions:
transcoding = True
client_info = {
"name": player.title,
"device": getattr(player, 'device', 'Unknown'),
"product": getattr(player, 'product', 'Unknown'),
"platform": getattr(player, 'platform', 'Unknown'),
"state": getattr(player, 'state', 'Unknown'),
"user": username,
"media": media_info,
"progress": progress,
"transcoding": transcoding
}
active_clients.append(client_info)
return json.dumps({
"status": "success",
"message": f"Found {len(active_clients)} active clients",
"count": len(active_clients),
"active_clients": active_clients
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error getting active clients: {str(e)}"
})
@mcp.tool()
async def client_start_playback(media_title: str, client_name: str = None,
offset: int = 0, library_name: str = None,
use_external_player: bool = False) -> str:
"""Start playback of media on a specified client.
Args:
media_title: Title of the media to play
client_name: Optional name of the client to play on (will prompt if not provided)
offset: Optional time offset in milliseconds to start from
library_name: Optional name of the library to search in
use_external_player: Whether to use the client's external player
"""
try:
plex = connect_to_plex()
# First, find the media item
results = []
if library_name:
try:
library = plex.library.section(library_name)
results = library.search(title=media_title)
except Exception:
return json.dumps({
"status": "error",
"message": f"Library '{library_name}' not found"
})
else:
results = plex.search(media_title)
if not results:
return json.dumps({
"status": "error",
"message": f"No media found matching '{media_title}'"
})
if len(results) > 1:
# If multiple results, provide information about them
media_list = []
for i, media in enumerate(results[:10], 1): # Limit to first 10 to avoid overwhelming
media_type = getattr(media, 'type', 'unknown')
title = getattr(media, 'title', 'Unknown')
year = getattr(media, 'year', '')
media_info = {
"index": i,
"title": title,
"type": media_type,
}
if year:
media_info["year"] = year
if media_type == 'episode':
show = getattr(media, 'grandparentTitle', 'Unknown Show')
season = getattr(media, 'parentIndex', '?')
episode = getattr(media, 'index', '?')
media_info["show"] = show
media_info["season"] = season
media_info["episode"] = episode
media_list.append(media_info)
return json.dumps({
"status": "multiple_results",
"message": f"Multiple items found matching '{media_title}'. Please specify a library or use a more specific title.",
"count": len(results),
"results": media_list
}, indent=2)
media = results[0]
# If no client name specified, list available clients
if not client_name:
clients = plex.clients()
if not clients:
return json.dumps({
"status": "error",
"message": "No clients are currently connected to your Plex server."
})
client_list = []
for i, client in enumerate(clients, 1):
client_list.append({
"index": i,
"name": client.title,
"device": getattr(client, 'device', 'Unknown')
})
return json.dumps({
"status": "client_selection",
"message": "Please specify a client to play on using the client_name parameter",
"available_clients": client_list
}, indent=2)
# Try to find the client
try:
client = plex.client(client_name)
except NotFound:
# Try to find a client with a matching name
matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
if matching_clients:
client = matching_clients[0]
else:
return json.dumps({
"status": "error",
"message": f"No client found matching '{client_name}'"
})
# Start playback
media_type = getattr(media, 'type', 'unknown')
title = getattr(media, 'title', 'Unknown')
formatted_title = title
if media_type == 'episode':
show = getattr(media, 'grandparentTitle', 'Unknown Show')
season = getattr(media, 'parentIndex', '?')
episode = getattr(media, 'index', '?')
formatted_title = f"{show} - S{season}E{episode} - {title}"
elif hasattr(media, 'year') and media.year:
formatted_title = f"{title} ({media.year})"
try:
if use_external_player:
# Open in external player if supported by client
if "Player" in client.protocolCapabilities:
media.playOn(client)
else:
return json.dumps({
"status": "error",
"message": f"Client '{client.title}' does not support external player"
})
else:
# Normal playback
client.playMedia(media, offset=offset)
return json.dumps({
"status": "success",
"message": f"Started playback of '{formatted_title}' on {client.title}",
"media": {
"title": title,
"type": media_type,
"formatted_title": formatted_title,
"rating_key": getattr(media, 'ratingKey', None)
},
"client": client.title,
"offset": offset
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error starting playback: {str(e)}"
})
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error setting up playback: {str(e)}"
})
@mcp.tool()
async def client_control_playback(client_name: str, action: str,
parameter: int = None, media_type: str = 'video') -> str:
"""Control playback on a specified client.
Args:
client_name: Name of the client to control
action: Action to perform (play, pause, stop, skipNext, skipPrevious,
stepForward, stepBack, seekTo, seekForward, seekBack, mute, unmute, setVolume)
parameter: Parameter for actions that require it (like setVolume or seekTo)
media_type: Type of media being controlled ('video', 'music', or 'photo')
"""
try:
plex = connect_to_plex()
# Validate action
valid_actions = [
'play', 'pause', 'stop', 'skipNext', 'skipPrevious',
'stepForward', 'stepBack', 'seekTo', 'seekForward', 'seekBack',
'mute', 'unmute', 'setVolume'
]
if action not in valid_actions:
return json.dumps({
"status": "error",
"message": f"Invalid action '{action}'. Valid actions are: {', '.join(valid_actions)}"
})
# Check if parameter is needed but not provided
actions_needing_parameter = ['seekTo', 'setVolume']
if action in actions_needing_parameter and parameter is None:
return json.dumps({
"status": "error",
"message": f"Action '{action}' requires a parameter value."
})
# Validate media type
valid_media_types = ['video', 'music', 'photo']
if media_type not in valid_media_types:
return json.dumps({
"status": "error",
"message": f"Invalid media type '{media_type}'. Valid types are: {', '.join(valid_media_types)}"
})
# Try to find the client
try:
client = plex.client(client_name)
except NotFound:
# Try to find a client with a matching name
matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
if matching_clients:
client = matching_clients[0]
else:
return json.dumps({
"status": "error",
"message": f"No client found matching '{client_name}'"
})
# Check if the client has playback control capability
if "playback" not in client.protocolCapabilities:
return json.dumps({
"status": "error",
"message": f"Client '{client.title}' does not support playback control."
})
# Perform the requested action
try:
# Transport controls
if action == 'play':
client.play()
elif action == 'pause':
client.pause()
elif action == 'stop':
client.stop()
elif action == 'skipNext':
client.skipNext()
elif action == 'skipPrevious':
client.skipPrevious()
elif action == 'stepForward':
client.stepForward()
elif action == 'stepBack':
client.stepBack()
# Seeking
elif action == 'seekTo':
# Parameter should be milliseconds
client.seekTo(parameter)
elif action == 'seekForward':
# Default to 30 seconds if no parameter
seconds = parameter if parameter is not None else 30
client.seekTo(client.timeline.time + (seconds * 1000))
elif action == 'seekBack':
# Default to 30 seconds if no parameter
seconds = parameter if parameter is not None else 30
seek_time = max(0, client.timeline.time - (seconds * 1000))
client.seekTo(seek_time)
# Volume controls
elif action == 'mute':
client.mute()
elif action == 'unmute':
client.unmute()
elif action == 'setVolume':
# Parameter should be 0-100
if parameter < 0 or parameter > 100:
return json.dumps({
"status": "error",
"message": "Volume must be between 0 and 100"
})
client.setVolume(parameter)
# Check timeline to confirm the action (may take a moment to update)
time.sleep(0.5) # Give a short delay for state to update
# Get updated timeline info
timeline = None
try:
timeline = client.timeline
if timeline:
timeline_data = {
"state": timeline.state,
"time": timeline.time,
"duration": timeline.duration,
"volume": getattr(timeline, "volume", None),
"muted": getattr(timeline, "muted", None)
}
else:
timeline_data = None
except:
timeline_data = None
return json.dumps({
"status": "success",
"message": f"Successfully performed action '{action}' on client '{client.title}'",
"action": action,
"client": client.title,
"parameter": parameter,
"timeline": timeline_data
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error controlling playback: {str(e)}"
})
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error setting up playback control: {str(e)}"
})
@mcp.tool()
async def client_navigate(client_name: str, action: str) -> str:
"""Navigate a Plex client interface.
Args:
client_name: Name of the client to navigate
action: Navigation action to perform (moveUp, moveDown, moveLeft, moveRight,
select, back, home, contextMenu)
"""
try:
plex = connect_to_plex()
# Validate action
valid_actions = [
'moveUp', 'moveDown', 'moveLeft', 'moveRight',
'select', 'back', 'home', 'contextMenu'
]
if action not in valid_actions:
return json.dumps({
"status": "error",
"message": f"Invalid navigation action '{action}'. Valid actions are: {', '.join(valid_actions)}"
})
# Try to find the client
try:
client = plex.client(client_name)
except NotFound:
# Try to find a client with a matching name
matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
if matching_clients:
client = matching_clients[0]
else:
return json.dumps({
"status": "error",
"message": f"No client found matching '{client_name}'"
})
# Check if the client has navigation capability
if "navigation" not in client.protocolCapabilities:
return json.dumps({
"status": "error",
"message": f"Client '{client.title}' does not support navigation control."
})
# Perform the requested action
try:
if action == 'moveUp':
client.moveUp()
elif action == 'moveDown':
client.moveDown()
elif action == 'moveLeft':
client.moveLeft()
elif action == 'moveRight':
client.moveRight()
elif action == 'select':
client.select()
elif action == 'back':
client.goBack()
elif action == 'home':
client.goToHome()
elif action == 'contextMenu':
client.contextMenu()
return json.dumps({
"status": "success",
"message": f"Successfully performed navigation action '{action}' on client '{client.title}'",
"action": action,
"client": client.title
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error navigating client: {str(e)}"
})
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error setting up client navigation: {str(e)}"
})
@mcp.tool()
async def client_set_streams(client_name: str, audio_stream_id: str = None,
subtitle_stream_id: str = None, video_stream_id: str = None) -> str:
"""Set audio, subtitle, or video streams for current playback on a client.
Args:
client_name: Name of the client to set streams for
audio_stream_id: ID of the audio stream to switch to
subtitle_stream_id: ID of the subtitle stream to switch to, use '0' to disable
video_stream_id: ID of the video stream to switch to
"""
try:
plex = connect_to_plex()
# Check if at least one stream ID is provided
if audio_stream_id is None and subtitle_stream_id is None and video_stream_id is None:
return json.dumps({
"status": "error",
"message": "At least one stream ID (audio, subtitle, or video) must be provided."
})
# Try to find the client
try:
client = plex.client(client_name)
except NotFound:
# Try to find a client with a matching name
matching_clients = [c for c in plex.clients() if client_name.lower() in c.title.lower()]
if matching_clients:
client = matching_clients[0]
else:
return json.dumps({
"status": "error",
"message": f"No client found matching '{client_name}'"
})
# Check if client is currently playing
timeline = None
try:
timeline = client.timeline
if timeline is None or not hasattr(timeline, 'state') or timeline.state != 'playing':
# Check active sessions to see if this client has a session
sessions = plex.sessions()
client_session = None
for session in sessions:
if (hasattr(session, 'player') and session.player and
hasattr(session.player, 'machineIdentifier') and
hasattr(client, 'machineIdentifier') and
session.player.machineIdentifier == client.machineIdentifier):
client_session = session
break
if not client_session:
return json.dumps({
"status": "error",
"message": f"Client '{client.title}' is not currently playing any media."
})
except:
return json.dumps({
"status": "error",
"message": f"Unable to get playback status for client '{client.title}'."
})
# Set streams
changed_streams = []
try:
if audio_stream_id is not None:
client.setAudioStream(audio_stream_id)
changed_streams.append(f"audio to {audio_stream_id}")
if subtitle_stream_id is not None:
client.setSubtitleStream(subtitle_stream_id)
changed_streams.append(f"subtitle to {subtitle_stream_id}")
if video_stream_id is not None:
client.setVideoStream(video_stream_id)
changed_streams.append(f"video to {video_stream_id}")
return json.dumps({
"status": "success",
"message": f"Successfully set streams for '{client.title}': {', '.join(changed_streams)}",
"client": client.title,
"changes": {
"audio_stream": audio_stream_id if audio_stream_id is not None else None,
"subtitle_stream": subtitle_stream_id if subtitle_stream_id is not None else None,
"video_stream": video_stream_id if video_stream_id is not None else None
}
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error setting streams: {str(e)}"
})
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error setting up stream selection: {str(e)}"
})
```
--------------------------------------------------------------------------------
/modules/playlist.py:
--------------------------------------------------------------------------------
```python
from modules import mcp, connect_to_plex
from typing import List
from plexapi.playlist import Playlist # type: ignore
from plexapi.exceptions import NotFound, BadRequest # type: ignore
import os
import requests
import base64
import json
# Functions for playlists and collections
@mcp.tool()
async def playlist_list(library_name: str = None, content_type: str = None) -> str:
"""List all playlists on the Plex server.
Args:
library_name: Optional library name to filter playlists from
content_type: Optional content type to filter playlists (audio, video, photo)
"""
try:
plex = connect_to_plex()
playlists = []
# Filter by content type if specified
if content_type:
valid_types = ["audio", "video", "photo"]
if content_type.lower() not in valid_types:
return json.dumps({"error": f"Invalid content type. Valid types are: {', '.join(valid_types)}"}, indent=4)
playlists = plex.playlists(playlistType=content_type.lower())
else:
playlists = plex.playlists()
# Filter by library if specified
if library_name:
try:
library = plex.library.section(library_name)
# Use the section's playlists method directly
if content_type:
playlists = library.playlists(playlistType=content_type.lower())
else:
playlists = library.playlists()
except NotFound:
return json.dumps({"error": f"Library '{library_name}' not found"}, indent=4)
# Format playlist data (lightweight version - no items)
playlist_data = []
for playlist in playlists:
try:
playlist_data.append({
"title": playlist.title,
"key": playlist.key,
"ratingKey": playlist.ratingKey,
"type": playlist.playlistType,
"summary": playlist.summary if hasattr(playlist, 'summary') else "",
"duration": playlist.duration if hasattr(playlist, 'duration') else None,
"item_count": playlist.leafCount if hasattr(playlist, 'leafCount') else None
})
except Exception as item_error:
# If there's an error with a specific playlist, include error info
playlist_data.append({
"title": getattr(playlist, 'title', 'Unknown'),
"key": getattr(playlist, 'key', 'Unknown'),
"error": str(item_error)
})
return json.dumps(playlist_data, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def playlist_create(playlist_title: str, item_titles: List[str], library_name: str = None, summary: str = None) -> str:
"""Create a new playlist with specified items.
Args:
playlist_title: Title for the new playlist
item_titles: List of media titles to include in the playlist
library_name: Optional library name to limit search to
summary: Optional summary description for the playlist
"""
try:
plex = connect_to_plex()
items = []
# Search for items in all libraries or specific library
for title in item_titles:
found = False
search_scope = plex.library.section(library_name) if library_name else plex.library
# Search for the item
search_results = search_scope.search(title=title)
if search_results:
items.append(search_results[0])
found = True
if not found:
return json.dumps({"status": "error", "message": f"Item '{title}' not found"}, indent=4)
if not items:
return json.dumps({"status": "error", "message": "No items found for the playlist"}, indent=4)
# Create the playlist
playlist = plex.createPlaylist(title=playlist_title, items=items, summary=summary)
return json.dumps({
"status": "success",
"message": f"Playlist '{playlist_title}' created successfully",
"data": {
"title": playlist.title,
"key": playlist.key,
"ratingKey": playlist.ratingKey,
"item_count": len(items)
}
}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=4)
@mcp.tool()
async def playlist_edit(playlist_title: str = None, playlist_id: int = None, new_title: str = None, new_summary: str = None) -> str:
"""Edit a playlist's details such as title and summary.
Args:
playlist_title: Title of the playlist to edit (optional if playlist_id is provided)
playlist_id: ID of the playlist to edit (optional if playlist_title is provided)
new_title: Optional new title for the playlist
new_summary: Optional new summary for the playlist
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"error": "Either playlist_id or playlist_title must be provided"}, indent=4)
# Find the playlist
playlist = None
original_title = None
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"error": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
original_title = playlist.title
except Exception as e:
return json.dumps({"error": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
else:
# Search by title
playlists = plex.playlists()
matching_playlists = [p for p in playlists if p.title.lower() == playlist_title.lower()]
if not matching_playlists:
return json.dumps({"error": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
playlist = matching_playlists[0]
original_title = playlist.title
# Track changes
changes = []
# Update title if provided
if new_title and new_title != playlist.title:
playlist.edit(title=new_title)
changes.append(f"title from '{original_title}' to '{new_title}'")
# Update summary if provided
if new_summary is not None: # Allow empty summaries
current_summary = playlist.summary if hasattr(playlist, 'summary') else ""
if new_summary != current_summary:
playlist.edit(summary=new_summary)
changes.append("summary")
if not changes:
return json.dumps({
"updated": False,
"title": playlist.title,
"message": "No changes made to the playlist"
}, indent=4)
return json.dumps({
"updated": True,
"title": new_title or playlist.title,
"changes": changes
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def playlist_upload_poster(playlist_title: str = None, playlist_id: int = None, poster_url: str = None, poster_filepath: str = None) -> str:
"""Upload a poster image for a playlist.
Args:
playlist_title: Title of the playlist to set poster for (optional if playlist_id is provided)
playlist_id: ID of the playlist to set poster for (optional if playlist_title is provided)
poster_url: URL to an image to use as poster
poster_filepath: Local file path to an image to use as poster
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"error": "Either playlist_id or playlist_title must be provided"}, indent=4)
# Check that at least one poster source is provided
if not poster_url and not poster_filepath:
return json.dumps({"error": "Either poster_url or poster_filepath must be provided"}, indent=4)
# Find the playlist
playlist = None
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"error": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
else:
# Search by title
playlists = plex.playlists()
matching_playlists = [p for p in playlists if p.title.lower() == playlist_title.lower()]
if not matching_playlists:
return json.dumps({"error": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
playlist = matching_playlists[0]
# Upload from URL
if poster_url:
try:
response = requests.get(poster_url)
if response.status_code != 200:
return json.dumps({"error": f"Failed to download image from URL: {response.status_code}"}, indent=4)
# Upload the poster
playlist.uploadPoster(url=poster_url)
return json.dumps({
"updated": True,
"poster_source": "url",
"title": playlist.title
}, indent=4)
except Exception as url_error:
return json.dumps({"error": f"Error uploading from URL: {str(url_error)}"}, indent=4)
# Upload from file
if poster_filepath:
if not os.path.exists(poster_filepath):
return json.dumps({"error": f"File not found: {poster_filepath}"}, indent=4)
try:
# Upload the poster
playlist.uploadPoster(filepath=poster_filepath)
return json.dumps({
"updated": True,
"poster_source": "file",
"title": playlist.title
}, indent=4)
except Exception as file_error:
return json.dumps({"error": f"Error uploading from file: {str(file_error)}"}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def playlist_copy_to_user(playlist_title: str = None, playlist_id: int = None, username: str = None) -> str:
"""Copy a playlist to another user account.
Args:
playlist_title: Title of the playlist to copy (optional if playlist_id is provided)
playlist_id: ID of the playlist to copy (optional if playlist_title is provided)
username: Username of the user to copy the playlist to
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"status": "error", "message": "Either playlist_id or playlist_title must be provided"}, indent=4)
if not username:
return json.dumps({"status": "error", "message": "Username must be provided"}, indent=4)
# Find the playlist
playlist = None
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"status": "error", "message": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
else:
# Search by title
playlists = plex.playlists()
matching_playlists = [p for p in playlists if p.title.lower() == playlist_title.lower()]
if not matching_playlists:
return json.dumps({"status": "error", "message": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
return json.dumps({
"status": "multiple_matches",
"message": f"Found {len(matching_playlists)} playlists with title '{playlist_title}'. Please specify the playlist ID.",
"matches": matches
}, indent=4)
playlist = matching_playlists[0]
# Find the user
users = plex.myPlexAccount().users()
user = next((u for u in users if u.title.lower() == username.lower()), None)
if not user:
return json.dumps({"status": "error", "message": f"User '{username}' not found"}, indent=4)
# Copy the playlist
playlist.copyToUser(user=user)
return json.dumps({
"status": "success",
"message": f"Playlist '{playlist.title}' copied to user '{username}'"
}, indent=4)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=4)
@mcp.tool()
async def playlist_add_to(playlist_title: str = None, playlist_id: int = None, item_titles: List[str] = None, item_ids: List[int] = None) -> str:
"""Add items to a playlist.
Args:
playlist_title: Title of the playlist to add to (optional if playlist_id is provided)
playlist_id: ID of the playlist to add to (optional if playlist_title is provided)
item_titles: List of media titles to add to the playlist (optional if item_ids is provided)
item_ids: List of media IDs to add to the playlist (optional if item_titles is provided)
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"error": "Either playlist_id or playlist_title must be provided"}, indent=4)
# Validate that at least one item source is provided
if (not item_titles or len(item_titles) == 0) and (not item_ids or len(item_ids) == 0):
return json.dumps({"error": "Either item_titles or item_ids must be provided"}, indent=4)
# Find the playlist
playlist = None
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"error": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
else:
# Search by title
playlists = plex.playlists()
matching_playlists = [p for p in playlists if p.title.lower() == playlist_title.lower()]
if not matching_playlists:
return json.dumps({"error": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
# Return as a direct array like playlist_list
return json.dumps({"Multiple Matches":matches}, indent=4)
playlist = matching_playlists[0]
# Find items to add
items_to_add = []
not_found = []
# If we have item IDs, try to add by ID first
if item_ids and len(item_ids) > 0:
for item_id in item_ids:
try:
# Try to fetch the item by ID
item = plex.fetchItem(item_id)
if item:
items_to_add.append(item)
else:
not_found.append(str(item_id))
except Exception as e:
not_found.append(str(item_id))
# If we have item titles, search for them
if item_titles and len(item_titles) > 0:
# Search all library sections
all_sections = plex.library.sections()
for title in item_titles:
found_item = None
possible_matches = []
# Try to find the item in each section
for section in all_sections:
# Skip photo libraries
if section.type in ['photo']:
continue
search_results = section.search(title)
if search_results:
# Check for exact title match (case insensitive)
exact_matches = [item for item in search_results if item.title.lower() == title.lower()]
if exact_matches:
found_item = exact_matches[0]
break
else:
# Add to possible matches if not an exact match
for item in search_results:
possible_matches.append({
"title": item.title,
"id": item.ratingKey,
"type": item.type,
"year": item.year if hasattr(item, 'year') and item.year else None
})
if found_item:
items_to_add.append(found_item)
elif possible_matches:
# If we have possible matches but no exact match, add title to not_found
# and store the possible matches to return later
not_found.append({
"title": title,
"possible_matches": possible_matches
})
else:
not_found.append(title)
if not items_to_add:
# If we have possible matches, return them
if any(isinstance(item, dict) for item in not_found):
possible_matches_response = []
for item in not_found:
if isinstance(item, dict) and "possible_matches" in item:
for match in item["possible_matches"]:
if match not in possible_matches_response:
possible_matches_response.append(match)
return json.dumps({"Multiple Possible Matches Use ID" : possible_matches_response}, indent=4)
return json.dumps({"error": "No matching items found to add to the playlist"}, indent=4)
# Add items to the playlist
for item in items_to_add:
playlist.addItems(item)
return json.dumps({
"added": True,
"title": playlist.title,
"items_added": [item.title for item in items_to_add],
"items_not_found": not_found,
"total_items": len(playlist.items())
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def playlist_remove_from(playlist_title: str = None, playlist_id: int = None, item_titles: List[str] = None) -> str:
"""Remove items from a playlist.
Args:
playlist_title: Title of the playlist to remove from (optional if playlist_id is provided)
playlist_id: ID of the playlist to remove from (optional if playlist_title is provided)
item_titles: List of media titles to remove from the playlist
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"error": "Either playlist_id or playlist_title must be provided"}, indent=4)
if not item_titles or len(item_titles) == 0:
return json.dumps({"error": "At least one item title must be provided to remove"}, indent=4)
# Find the playlist
playlist = None
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"error": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
else:
# Search by title
playlists = plex.playlists()
matching_playlists = [p for p in playlists if p.title.lower() == playlist_title.lower()]
if not matching_playlists:
return json.dumps({"error": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
# Return as a direct array like playlist_list
return json.dumps({"Multiple Matches":matches}, indent=4)
playlist = matching_playlists[0]
# Get current items in the playlist
playlist_items = playlist.items()
# Find items to remove
items_to_remove = []
not_found = []
for title in item_titles:
found = False
for item in playlist_items:
if item.title.lower() == title.lower():
items_to_remove.append(item)
found = True
break
if not found:
not_found.append(title)
if not items_to_remove:
# No items found to remove, return the current playlist contents
current_items = []
for item in playlist_items:
current_items.append({
"title": item.title,
"type": item.type,
"id": item.ratingKey
})
return json.dumps({
"error": "No matching items found in the playlist to remove",
"playlist_title": playlist.title,
"playlist_id": playlist.ratingKey,
"current_items": current_items
}, indent=4)
# Remove items from the playlist
# Using removeItems (plural) since removeItem is deprecated
playlist.removeItems(items_to_remove)
return json.dumps({
"removed": True,
"title": playlist.title,
"items_removed": [item.title for item in items_to_remove],
"items_not_found": not_found,
"remaining_items": len(playlist.items())
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def playlist_delete(playlist_title: str = None, playlist_id: int = None) -> str:
"""Delete a playlist.
Args:
playlist_title: Title of the playlist to delete (optional if playlist_id is provided)
playlist_id: ID of the playlist to delete (optional if playlist_title is provided)
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"error": "Either playlist_id or playlist_title must be provided"}, indent=4)
# Find the playlist
playlist = None
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"error": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
except Exception as e:
return json.dumps({"error": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
else:
# Search by title
playlists = plex.playlists()
matching_playlists = [p for p in playlists if p.title.lower() == playlist_title.lower()]
if not matching_playlists:
return json.dumps({"error": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
playlist = matching_playlists[0]
# Get the playlist title to return in the message
playlist_title_to_return = playlist.title
# Delete the playlist
playlist.delete()
# Return a simple object with the result
return json.dumps({
"deleted": True,
"title": playlist_title_to_return
}, indent=4)
except Exception as e:
return json.dumps({"error": str(e)}, indent=4)
@mcp.tool()
async def playlist_get_contents(playlist_title: str = None, playlist_id: int = None) -> str:
"""Get the contents of a playlist.
Args:
playlist_title: Title of the playlist to get contents of (optional if playlist_id is provided)
playlist_id: ID of the playlist to get contents of (optional if playlist_title is provided)
Returns:
JSON object containing the playlist contents
"""
try:
plex = connect_to_plex()
# Validate that at least one identifier is provided
if not playlist_id and not playlist_title:
return json.dumps({"error": "Either playlist_id or playlist_title must be provided"}, indent=4)
# If playlist_id is provided, use it to directly fetch the playlist
if playlist_id:
try:
playlist = None
# Try fetching by ratingKey first
try:
playlist = plex.fetchItem(playlist_id)
print(playlist.items())
except:
# If that fails, try finding by key in all playlists
all_playlists = plex.playlists()
playlist = next((p for p in all_playlists if p.ratingKey == playlist_id), None)
if not playlist:
return json.dumps({"error": f"Playlist with ID '{playlist_id}' not found"}, indent=4)
# Get playlist contents
print(playlist)
return get_playlist_contents(playlist)
except Exception as e:
if "500" in str(e):
return json.dumps({"error": "Empty playlist"}, indent=4)
else:
return json.dumps({"error": f"Error fetching playlist by ID: {str(e)}"}, indent=4)
# If we get here, we're searching by title
all_playlists = plex.playlists()
matching_playlists = [p for p in all_playlists if p.title.lower() == playlist_title.lower()]
# If no matching playlists
if not matching_playlists:
return json.dumps({"error": f"No playlist found with title '{playlist_title}'"}, indent=4)
# If multiple matching playlists, return list of matches with IDs
if len(matching_playlists) > 1:
matches = []
for p in matching_playlists:
matches.append({
"title": p.title,
"id": p.ratingKey,
"type": p.playlistType,
"item_count": p.leafCount if hasattr(p, 'leafCount') else len(p.items())
})
# Return as a direct array like playlist_list
return json.dumps(matches, indent=4)
# Single match - get contents
return get_playlist_contents(matching_playlists[0])
except Exception as e:
return json.dumps({"status": "error", "message": f"Error getting playlist contents: {str(e)}"}, indent=4)
def get_playlist_contents(playlist):
"""Helper function to get formatted playlist contents."""
print(playlist)
try:
items = playlist.items()
playlist_items = []
for item in items:
item_data = {
"title": item.title,
"type": item.type,
"ratingKey": item.ratingKey,
"addedAt": item.addedAt.strftime("%Y-%m-%d %H:%M:%S") if hasattr(item, 'addedAt') else None,
"duration": item.duration if hasattr(item, 'duration') else None,
"thumb": item.thumb if hasattr(item, 'thumb') else None
}
# Add media-type specific fields
if item.type == 'movie':
item_data["year"] = item.year if hasattr(item, 'year') else None
elif item.type == 'episode':
item_data["show"] = item.grandparentTitle if hasattr(item, 'grandparentTitle') else None
item_data["season"] = item.parentTitle if hasattr(item, 'parentTitle') else None
item_data["seasonNumber"] = item.parentIndex if hasattr(item, 'parentIndex') else None
item_data["episodeNumber"] = item.index if hasattr(item, 'index') else None
elif item.type == 'track':
item_data["artist"] = item.grandparentTitle if hasattr(item, 'grandparentTitle') else None
item_data["album"] = item.parentTitle if hasattr(item, 'parentTitle') else None
item_data["albumArtist"] = item.originalTitle if hasattr(item, 'originalTitle') else None
playlist_items.append(item_data)
playlist_info = {
"title": playlist.title,
"id": playlist.ratingKey,
"key": playlist.key,
"type": playlist.playlistType,
"summary": playlist.summary if hasattr(playlist, 'summary') else None,
"duration": playlist.duration if hasattr(playlist, 'duration') else None,
"itemCount": len(playlist_items),
"items": playlist_items
}
# Return just the playlist info without status wrappers
return json.dumps(playlist_info, indent=4)
except Exception as e:
return json.dumps({"error": f"Error formatting playlist contents: {str(e)}"}, indent=4)
```