# Directory Structure
```
├── .cursorrules
├── .gitignore
├── .python-version
├── architecture.md
├── media
│ ├── article_spotify.md
│ └── take5.mp4
├── pyproject.toml
├── README.md
├── spotify_mcp.log
├── src
│ └── spotify_mcp
│ ├── __init__.py
│ ├── server.py
│ ├── spotify_api.py
│ ├── spotify_mcp.log
│ └── utils.py
├── test-notebook.ipynb
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
.cache
__pycache__/
.env
.idea/
```
--------------------------------------------------------------------------------
/.cursorrules:
--------------------------------------------------------------------------------
```
Here are some best practices and rules you must follow:
- You use Python 3.12
- Frameworks:
- pydantic
- fastapi
- sqlalchemy
- You use uv for dependency management
- You use alembic for database migrations
- You use fastapi-users for user management
- You use fastapi-jwt-auth for authentication
- You use fastapi-mail for email sending
- You use fastapi-cache for caching
- You use fastapi-limiter for rate limiting
- You use fastapi-pagination for pagination
1. **Use Meaningful Names**: Choose descriptive variable, function, and class names.
2. **Follow PEP 8**: Adhere to the Python Enhancement Proposal 8 style guide for formatting.
3. **Use Docstrings**: Document functions and classes with docstrings to explain their purpose.
4. **Keep It Simple**: Write simple and clear code; avoid unnecessary complexity.
5. **Use List Comprehensions**: Prefer list comprehensions for creating lists over traditional loops when appropriate.
6. **Handle Exceptions**: Use try-except blocks to handle exceptions gracefully.
7. **Use Virtual Environments**: Isolate project dependencies using virtual environments (e.g., `venv`).
8. **Write Tests**: Implement unit tests to ensure code reliability.
9. **Use Type Hints**: Utilize type hints for better code clarity and type checking.
10. **Avoid Global Variables**: Limit the use of global variables to reduce side effects.
These rules will help you write clean, efficient, and maintainable Python code.
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# spotify-mcp MCP server
MCP project to connect Claude with Spotify. Built on top of [spotipy-dev's API](https://github.com/spotipy-dev/spotipy/tree/2.24.0).
## Features
- Start, pause, and skip playback
- Search for tracks/albums/artists/playlists
- Get info about a track/album/artist/playlist
- Manage the Spotify queue
## Demo
Make sure to turn on audio
<details>
<summary>
Video
</summary>
https://github.com/user-attachments/assets/20ee1f92-f3e3-4dfa-b945-ca57bc1e0894
</summary>
</details>
## Configuration
### Getting Spotify API Keys
Create an account on [developer.spotify.com](https://developer.spotify.com/). Navigate to [the dashboard](https://developer.spotify.com/dashboard).
Create an app with redirect_uri as http://localhost:8888. (You can choose any port you want but you must use http and localhost).
I set "APIs used" to "Web Playback SDK".
### Run this project locally
This project is not yet set up for ephemeral environments (e.g. `uvx` usage).
Run this project locally by cloning this repo
```bash
git clone https://github.com/varunneal/spotify-mcp.git
```
Add this tool as a mcp server.
On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
```json
"spotify": {
"command": "uv",
"args": [
"--directory",
"/path/to/spotify_mcp",
"run",
"spotify-mcp"
],
"env": {
"SPOTIFY_CLIENT_ID": YOUR_CLIENT_ID,
"SPOTIFY_CLIENT_SECRET": YOUR_CLIENT_SECRET,
"SPOTIFY_REDIRECT_URI": "http://localhost:8888"
}
}
```
### Troubleshooting
Please open an issue if you can't get this MCP working. Here are some tips:
1. Make sure `uv` is updated. I recommend version `>=0.54`.
2. Make sure claude has execution permisisons for the project: `chmod -R 755`.
3. Ensure you have Spotify premium (needed for running developer API).
This MCP will emit logs to std err (as specified in the MCP) spec. On Mac the Claude Desktop app should emit these logs
to `~/Library/Logs/Claude`.
On other platforms [you can find logs here](https://modelcontextprotocol.io/quickstart/user#getting-logs-from-claude-for-desktop).
## TODO
Unfortunately, a bunch of cool features have [now been deprecated](https://techcrunch.com/2024/11/27/spotify-cuts-developer-access-to-several-of-its-recommendation-features/)
from the Spotify API. Most new features will be relatively minor or for the health of the project:
- tests.
- adding API support for managing playlists.
- adding API support for paginated search results/playlists/albums.
PRs appreciated!
## Deployment
(todo)
### Building and Publishing
To prepare the package for distribution:
1. Sync dependencies and update lockfile:
```bash
uv sync
```
2. Build package distributions:
```bash
uv build
```
This will create source and wheel distributions in the `dist/` directory.
3. Publish to PyPI:
```bash
uv publish
```
Note: You'll need to set PyPI credentials via environment variables or command flags:
- Token: `--token` or `UV_PUBLISH_TOKEN`
- Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
### Debugging
Since MCP servers run over stdio, debugging can be challenging. For the best debugging
experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
```bash
npx @modelcontextprotocol/inspector uv --directory /Users/varun/Documents/Python/spotify_mcp run spotify-mcp
```
Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```
--------------------------------------------------------------------------------
/src/spotify_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
from . import server
import asyncio
def main():
"""Main entry point for the package."""
asyncio.run(server.main())
# Optionally expose other important items at package level
__all__ = ['main', 'server']
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "spotify-mcp"
version = "0.2.0"
description = "MCP spotify project"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"ipykernel>=6.29.5",
"mcp==1.3.0",
"python-dotenv>=1.0.1",
"spotipy==2.24.0",
]
[[project.authors]]
name = "Varun Srivastava"
email = "[email protected]"
[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
]
[tool.uv.sources]
spotify-mcp = { workspace = true }
[project.scripts]
spotify-mcp = "spotify_mcp:main"
```
--------------------------------------------------------------------------------
/architecture.md:
--------------------------------------------------------------------------------
```markdown
# Architecture du Projet spotify-mcp
Ce document présente l'architecture du projet spotify-mcp, un serveur MCP (Model Context Protocol) permettant à Claude d'interagir avec l'API Spotify.
## Vue d'ensemble
spotify-mcp est un serveur MCP (Model Context Protocol) qui permet à Claude d'interagir avec Spotify. Il utilise la bibliothèque spotipy pour communiquer avec l'API Spotify et expose des fonctionnalités comme la lecture, la pause, la recherche de musique et la gestion de la file d'attente.
## Diagramme de composants
```mermaid
graph TD
Claude[Claude Assistant] <-->|MCP Protocol| Server[Server MCP]
Server <-->|API Calls| SpotifyAPI[Spotify API Client]
SpotifyAPI <-->|HTTP Requests| SpotifyWeb[Spotify Web API]
subgraph "spotify-mcp"
Server
SpotifyAPI
Utils[Utilities]
end
Server --> Utils
SpotifyAPI --> Utils
```
## Structure du projet
```mermaid
graph LR
Root[spotify-mcp/] --> Src[src/]
Root --> Config[pyproject.toml]
Root --> Readme[README.md]
Root --> Env[.env]
Src --> Package[spotify_mcp/]
Package --> Init[__init__.py]
Package --> ServerPy[server.py]
Package --> SpotifyApiPy[spotify_api.py]
Package --> UtilsPy[utils.py]
```
## Flux de données
```mermaid
sequenceDiagram
participant Claude as Claude
participant Server as Server MCP
participant SpotifyAPI as Spotify API Client
participant SpotifyWeb as Spotify Web API
Claude->>Server: Appel d'outil (ex: playback, search, queue)
Server->>SpotifyAPI: Demande correspondante
SpotifyAPI->>SpotifyWeb: Requête HTTP API
SpotifyWeb-->>SpotifyAPI: Réponse JSON
SpotifyAPI-->>Server: Données formatées
Server-->>Claude: Réponse formatée pour l'assistant
```
## Classes principales
### Modèle de classe pour server.py
```mermaid
classDiagram
class Server {
+list_prompts()
+list_resources()
+list_tools()
+call_tool(name, arguments)
}
class ToolModel {
+as_tool()
}
class Playback {
+action: str
+spotify_uri: Optional[str]
+num_skips: Optional[int]
}
class Queue {
+action: str
+track_id: Optional[str]
}
class GetInfo {
+item_uri: str
}
class Search {
+query: str
+qtype: Optional[str]
+limit: Optional[int]
}
class TopItems {
+item_type: str
+time_range: Optional[str]
+limit: Optional[int]
}
ToolModel <|-- Playback
ToolModel <|-- Queue
ToolModel <|-- GetInfo
ToolModel <|-- Search
ToolModel <|-- TopItems
```
### Modèle de classe pour spotify_api.py
```mermaid
classDiagram
class Client {
-sp: Spotify
-auth_manager: SpotifyOAuth
-cache_handler: CacheFileHandler
-username: str
-logger: Logger
+get_username()
+search(query, qtype, limit, device)
+get_top_items(item_type, time_range, limit)
+get_info(item_uri)
+get_current_track()
+start_playback(spotify_uri, device)
+pause_playback(device)
+add_to_queue(track_id, device)
+get_queue(device)
+skip_track(n)
+previous_track()
+seek_to_position(position_ms)
+set_volume(volume_percent)
}
```
## Fonctionnalités principales
1. **Lecture et contrôle**
- Démarrer, mettre en pause, passer à la chanson suivante
- Obtenir des informations sur la piste en cours
- Gérer la file d'attente Spotify
2. **Recherche et découverte**
- Rechercher des pistes, albums, artistes, playlists
- Obtenir des informations détaillées sur un élément Spotify
- Obtenir les éléments préférés de l'utilisateur (artistes, pistes)
## Flux d'authentification
```mermaid
sequenceDiagram
participant User as Utilisateur
participant Server as Server MCP
participant SpotifyOAuth as SpotifyOAuth
participant SpotifyWeb as Spotify Web API
User->>Server: Lance le serveur MCP
Server->>SpotifyOAuth: Initialise l'authentification
SpotifyOAuth->>User: Ouvre le navigateur pour l'autorisation
User->>SpotifyWeb: Autorise l'application
SpotifyWeb->>SpotifyOAuth: Redirige avec le code d'autorisation
SpotifyOAuth->>SpotifyWeb: Échange le code contre un token
SpotifyWeb-->>SpotifyOAuth: Renvoie le token d'accès
SpotifyOAuth-->>Server: Stocke le token dans le cache
Server-->>User: Prêt à recevoir des commandes
```
## Interface MCP
Le projet implémente le protocole MCP (Model Context Protocol) qui permet à Claude d'interagir avec des outils externes. Les outils exposés incluent:
1. **playback** - Contrôle de la lecture
2. **queue** - Gestion de la file d'attente
3. **get_info** - Obtention d'informations sur un élément Spotify
4. **search** - Recherche d'éléments sur Spotify
5. **top_items** - Obtention des éléments préférés de l'utilisateur
## Dépendances
Les principales dépendances du projet sont:
- mcp==1.3.0 - Bibliothèque pour implémenter le protocole MCP
- python-dotenv>=1.0.1 - Pour gérer les variables d'environnement
- spotipy==2.24.0 - SDK Python pour l'API Spotify
```
--------------------------------------------------------------------------------
/src/spotify_mcp/utils.py:
--------------------------------------------------------------------------------
```python
from collections import defaultdict
from typing import Optional, Dict
import functools
from typing import Callable, TypeVar
from urllib.parse import quote
T = TypeVar("T")
def parse_track(track_item: dict, detailed=False) -> Optional[dict]:
if not track_item:
return None
narrowed_item = {
"name": track_item["name"],
"id": track_item["id"],
}
if "is_playing" in track_item:
narrowed_item["is_playing"] = track_item["is_playing"]
if detailed:
narrowed_item["album"] = parse_album(track_item.get("album"))
for k in ["track_number", "duration_ms"]:
narrowed_item[k] = track_item.get(k)
if not track_item.get("is_playable", True):
narrowed_item["is_playable"] = False
artists = [a["name"] for a in track_item["artists"]]
if detailed:
artists = [parse_artist(a) for a in track_item["artists"]]
if len(artists) == 1:
narrowed_item["artist"] = artists[0]
else:
narrowed_item["artists"] = artists
return narrowed_item
def parse_artist(artist_item: dict, detailed=False) -> Optional[dict]:
if not artist_item:
return None
narrowed_item = {
"name": artist_item["name"],
"id": artist_item["id"],
}
if detailed:
narrowed_item["genres"] = artist_item.get("genres")
return narrowed_item
def parse_playlist(playlist_item: dict, username, detailed=False) -> Optional[dict]:
if not playlist_item:
return None
narrowed_item = {
"name": playlist_item["name"],
"id": playlist_item["id"],
"owner": playlist_item["owner"]["display_name"],
"user_is_owner": playlist_item["owner"]["display_name"] == username,
}
if detailed:
narrowed_item["description"] = playlist_item.get("description")
tracks = []
for t in playlist_item["tracks"]["items"]:
tracks.append(parse_track(t["track"]))
narrowed_item["tracks"] = tracks
return narrowed_item
def parse_album(album_item: dict, detailed=False) -> dict:
narrowed_item = {
"name": album_item["name"],
"id": album_item["id"],
}
artists = [a["name"] for a in album_item["artists"]]
if detailed:
tracks = []
for t in album_item["tracks"]["items"]:
tracks.append(parse_track(t))
narrowed_item["tracks"] = tracks
artists = [parse_artist(a) for a in album_item["artists"]]
for k in ["total_tracks", "release_date", "genres"]:
narrowed_item[k] = album_item.get(k)
if len(artists) == 1:
narrowed_item["artist"] = artists[0]
else:
narrowed_item["artists"] = artists
return narrowed_item
def parse_search_results(results: Dict, qtype: str, username: Optional[str] = None):
_results = defaultdict(list)
# potential
# if username:
# _results['User Spotify URI'] = username
for q in qtype.split(","):
match q:
case "track":
for idx, item in enumerate(results["tracks"]["items"]):
if not item:
continue
_results["tracks"].append(parse_track(item))
case "artist":
for idx, item in enumerate(results["artists"]["items"]):
if not item:
continue
_results["artists"].append(parse_artist(item))
case "playlist":
for idx, item in enumerate(results["playlists"]["items"]):
if not item:
continue
_results["playlists"].append(parse_playlist(item, username))
case "album":
for idx, item in enumerate(results["albums"]["items"]):
if not item:
continue
_results["albums"].append(parse_album(item))
case _:
raise ValueError(f"Unknown qtype {qtype}")
return dict(_results)
def build_search_query(
base_query: str,
artist: Optional[str] = None,
track: Optional[str] = None,
album: Optional[str] = None,
year: Optional[str] = None,
year_range: Optional[tuple[int, int]] = None,
# upc: Optional[str] = None,
# isrc: Optional[str] = None,
genre: Optional[str] = None,
is_hipster: bool = False,
is_new: bool = False,
) -> str:
"""
Build a search query string with optional filters.
Args:
base_query: Base search term
artist: Artist name filter
track: Track name filter
album: Album name filter
year: Specific year filter
year_range: Tuple of (start_year, end_year) for year range filter
genre: Genre filter
is_hipster: Filter for lowest 10% popularity albums
is_new: Filter for albums released in past two weeks
Returns:
Encoded query string with applied filters
"""
filters = []
if artist:
filters.append(f"artist:{artist}")
if track:
filters.append(f"track:{track}")
if album:
filters.append(f"album:{album}")
if year:
filters.append(f"year:{year}")
if year_range:
filters.append(f"year:{year_range[0]}-{year_range[1]}")
if genre:
filters.append(f"genre:{genre}")
if is_hipster:
filters.append("tag:hipster")
if is_new:
filters.append("tag:new")
query_parts = [base_query] + filters
return quote(" ".join(query_parts))
def validate(func: Callable[..., T]) -> Callable[..., T]:
"""
Decorator for Spotify API methods that handles authentication and device validation.
- Checks and refreshes authentication if needed
- Validates active device and retries with candidate device if needed
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
# Handle authentication
if not self.auth_ok():
self.auth_refresh()
# Handle device validation
if not self.is_active_device():
kwargs["device"] = self._get_candidate_device()
# TODO: try-except RequestException
return func(self, *args, **kwargs)
return wrapper
```
--------------------------------------------------------------------------------
/media/article_spotify.md:
--------------------------------------------------------------------------------
```markdown
# Analyse Technique du Spotify Model Context Protocol (MCP)
## Introduction
Le Model Context Protocol (MCP) est un protocole standardisé conçu pour permettre aux modèles de langage (LLM - Large Language Models) d'interagir de manière structurée avec des outils et des systèmes externes. Dans le contexte de ce projet, le Spotify Model Context Protocol permet à l'assistant Claude d'interagir directement avec l'API Spotify.
## Architecture Technique
### Vue d'ensemble
L'architecture du projet repose sur trois composants principaux :
1. Un serveur MCP (Model Context Protocol)
2. Un client API Spotify
3. Des utilitaires de support
L'interaction entre ces composants suit un modèle en couches bien défini, où chaque requête traverse la chaîne suivante :
```Claude → Serveur MCP → Client Spotify API → API Web Spotify```
```mermaid
graph TD
Claude[Claude Assistant] <-->|MCP Protocol| Server[Server MCP]
Server <-->|API Calls| SpotifyAPI[Spotify API Client]
SpotifyAPI <-->|HTTP Requests| SpotifyWeb[Spotify Web API]
subgraph "spotify-mcp"
Server
SpotifyAPI
Utils[Utilities]
end
Server --> Utils
SpotifyAPI --> Utils
```
### Composants Clés
#### 1. Serveur MCP (`server.py`)
Le serveur MCP agit comme point d'entrée principal et expose cinq outils fondamentaux :
- `playback` : Contrôle de lecture (play, pause, skip)
- `queue` : Gestion de la file d'attente
- `get_info` : Récupération des métadonnées
- `search` : Recherche dans le catalogue Spotify
- `top_items` : Accès aux éléments favoris de l'utilisateur
- `playlistcreator` : Crée une playlist et permet d'y ajouter des titres
#### 2. Client Spotify (`spotify_api.py`)
Le client API encapsule toute la logique d'interaction avec Spotify via la bibliothèque `spotipy`. Il gère :
- L'authentification OAuth
- La mise en cache des tokens
- Les appels API
- La gestion des erreurs
### Flux d'Authentification
Le processus d'authentification suit le protocole OAuth 2.0 :
1. Initialisation du serveur MCP
2. Ouverture du navigateur pour autorisation utilisateur
3. Échange du code d'autorisation contre un token
4. Mise en cache du token pour les futures requêtes
## Fonctionnalités Techniques
### 1. Contrôle de Lecture
```python
class Playback(ToolModel):
action: str
spotify_uri: Optional[str]
num_skips: Optional[int]
```
Permet le contrôle granulaire de la lecture avec :
- Démarrage/pause de la lecture
- Navigation entre les pistes
- Contrôle du volume
- Positionnement dans la piste
### 2. Gestion de File d'Attente
```python
class Queue(ToolModel):
action: str
track_id: Optional[str]
```
Offre des capacités de :
- Ajout de pistes à la file
- Consultation de la file
- Manipulation de l'ordre de lecture
### 3. Système de Recherche
```python
class Search(ToolModel):
query: str
qtype: Optional[str]
limit: Optional[int]
```
Implémente une recherche multi-critères pour :
- Pistes
- Albums
- Artistes
- Playlists
### 4. TopItems
#### Structure
```python
class TopItems(ToolModel):
"""Get the user's top artists or tracks based on calculated affinity."""
item_type: str = Field(
description="Type of items to retrieve ('artists' or 'tracks')"
)
time_range: Optional[str] = Field(
default="long_term",
description="Time period over which to retrieve top items: 'long_term' (~ 1 year), 'medium_term' (~ 6 months), or 'short_term' (~ 4 weeks)",
)
limit: Optional[int] = Field(
default=10, description="Number of items to retrieve (max 50)"
)
```
#### Fonctionnalités
- **Type d'éléments** : Permet de récupérer soit les artistes (`artists`) soit les morceaux (`tracks`) les plus écoutés
- **Périodes d'analyse** :
- `long_term` : Environ 1 an d'historique
- `medium_term` : Environ 6 mois d'historique
- `short_term` : Environ 4 semaines d'historique
- **Limitation** : Possibilité de définir le nombre d'éléments à récupérer (max 50)
#### Implémentation
Dans le gestionnaire d'outils (`handle_call_tool`), TopItems est traité comme suit :
```python
case "TopItems":
item_type = arguments.get("item_type", "artists")
time_range = arguments.get("time_range", "long_term")
limit = arguments.get("limit", 10)
top_items = spotify_client.get_top_items(
item_type=item_type,
time_range=time_range,
limit=limit
)
return [
types.TextContent(
type="text",
text=json.dumps(top_items, indent=2)
)
]
```
### 5. PlaylistCreator
#### Structure
```python
class PlaylistCreator(ToolModel):
"""Création et gestion des playlists Spotify"""
action: str = Field(description="Action : 'create', 'search_and_add'")
playlist_details: Optional[dict] = Field(
description={
"name": "Nom de la playlist",
"description": "Description de la playlist",
"public": "Visibilité (true/false)",
"collaborative": "Playlist collaborative (true/false)",
}
)
playlist_id: Optional[str] = Field(
description="ID de la playlist (requis pour search_and_add)"
)
search_query: Optional[str] = Field(description="Recherche de titres à ajouter")
limit: Optional[int] = Field(
default=10, description="Nombre maximum de résultats de recherche"
)
```
#### Fonctionnalités
##### 1. Création de Playlist (`create`)
- Crée une nouvelle playlist avec les paramètres suivants :
- Nom (obligatoire)
- Description (optionnel)
- Visibilité publique/privée
- Option collaborative
- Utilise l'API Spotify pour créer la playlist sous le compte de l'utilisateur actuel
##### 2. Recherche et Ajout (`search_and_add`)
- Permet d'ajouter des titres à une playlist existante
- Fonctionnalités avancées :
- Recherche de playlist par ID ou par nom
- Recherche de titres avec critères spécifiques
- Gestion des erreurs détaillée
- Logging des opérations
#### Implémentation Notable
##### Création de Playlist
```python
user_id = spotify_client.sp.current_user()["id"]
new_playlist = spotify_client.sp.user_playlist_create(
user=user_id,
name=details.get("name"),
public=details.get("public", True),
collaborative=details.get("collaborative", False),
description=details.get("description", ""),
)
```
##### Recherche et Ajout de Titres
```python
# Recherche intelligente de playlist
if not playlist_id.startswith("spotify:playlist:") and not len(playlist_id) == 22:
playlists = spotify_client.sp.current_user_playlists()
for playlist in playlists["items"]:
if playlist["name"] == playlist_id:
playlist_id = playlist["id"]
break
# Recherche de titre
sp_results = spotify_client.sp.search(
q=search_query,
type="track",
limit=1,
market="FR"
)
# Ajout à la playlist
track_uri = sp_results["tracks"]["items"][0]["uri"]
add_result = spotify_client.sp.playlist_add_items(
playlist_id=playlist_id,
items=[track_uri]
)
```
#### Gestion des Erreurs
- Validation des entrées
- Gestion des cas d'erreur Spotify
- Logging détaillé des opérations
- Messages d'erreur explicites pour l'utilisateur
#### Points Forts
1. **Flexibilité** : Support de multiples actions et paramètres
2. **Robustesse** : Gestion complète des erreurs et des cas limites
3. **Traçabilité** : Logging détaillé des opérations
4. **Convivialité** : Messages clairs et retours d'information détaillés
## Aspects Techniques Notables
### Gestion des Erreurs
Le système implémente une gestion robuste des erreurs avec :
- Retry sur les erreurs réseau
- Gestion des expirations de token
- Logging détaillé des erreurs
### Performance
Optimisations notables :
- Mise en cache des tokens d'authentification
- Limitation des requêtes API
- Gestion efficace des ressources
### Extensibilité
L'architecture modulaire permet :
- L'ajout facile de nouveaux outils
- La modification des comportements existants
- L'intégration de nouvelles fonctionnalités Spotify
## Conclusion
Le Spotify MCP représente une solution technique élégante pour l'intégration entre les assistants IA et Spotify. Son architecture modulaire, sa gestion robuste des erreurs et ses fonctionnalités complètes en font un outil puissant pour le contrôle programmatique de Spotify.
## Stack Technique
- Python 3.12
- FastAPI
- Spotipy 2.24.0
- MCP 1.3.0
- Python-dotenv
---
_Note : Cet article est basé sur l'analyse de l'architecture et du code source du projet spotify-mcp._
```
--------------------------------------------------------------------------------
/src/spotify_mcp/spotify_api.py:
--------------------------------------------------------------------------------
```python
import logging
import os
from typing import Optional, Dict, List
import spotipy
from dotenv import load_dotenv
from spotipy.cache_handler import CacheFileHandler
from spotipy.oauth2 import SpotifyOAuth
from . import utils
load_dotenv()
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI")
SCOPES = [
"user-read-currently-playing",
"user-read-playback-state",
"user-modify-playback-state",
"user-read-currently-playing",
"user-top-read",
"playlist-modify-public",
"playlist-modify-private",
"app-remote-control",
"streaming", # playback
"playlist-read-private",
"playlist-read-collaborative",
# playlists
"user-read-playback-position",
"user-read-recently-played", # listening history
"user-library-modify",
"user-library-read", # library
]
class Client:
def __init__(self, logger: logging.Logger):
"""Initialize Spotify client with necessary permissions"""
self.logger = logger
self.logger.info("Initializing Spotify client with logger")
scope = "user-library-read,user-read-playback-state,user-modify-playback-state,user-read-currently-playing,user-top-read,playlist-modify-public,playlist-modify-private"
try:
self.sp = spotipy.Spotify(
auth_manager=SpotifyOAuth(
scope=scope,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uri=REDIRECT_URI,
)
)
self.auth_manager: SpotifyOAuth = self.sp.auth_manager
self.cache_handler: CacheFileHandler = self.auth_manager.cache_handler
self.username = self.get_username()
except Exception as e:
self.logger.error(f"Failed to initialize Spotify client: {str(e)}")
raise
@utils.validate
def get_username(self, device=None):
return self.sp.current_user()["display_name"]
@utils.validate
def search(self, query: str, qtype: str = "track", limit=10, device=None):
"""
Searches based of query term.
- query: query term
- qtype: the types of items to return. One or more of 'artist', 'album', 'track', 'playlist'.
If multiple types are desired, pass in a comma separated string; e.g. 'track,album'
- limit: max # items to return
"""
results = self.sp.search(q=query, limit=limit, type=qtype)
return utils.parse_search_results(results, qtype, self.username)
def recommendations(
self, artists: Optional[List] = None, tracks: Optional[List] = None, limit=20
):
# doesnt work
recs = self.sp.recommendations(
seed_artists=artists, seed_tracks=tracks, limit=limit
)
return recs
def get_top_items(self, item_type="artists", time_range="long_term", limit=10):
"""
Get the current user's top artists or tracks.
Args:
item_type: Type of items to retrieve ('artists' or 'tracks')
time_range: Time period over which to retrieve top items:
'long_term' (~ 1 year), 'medium_term' (~ 6 months),
or 'short_term' (~ 4 weeks)
limit: Number of items to retrieve (max 50)
Returns:
JSON response from the Spotify API containing the top items
"""
if item_type not in ["artists", "tracks"]:
raise ValueError("item_type must be 'artists' or 'tracks'")
if time_range not in ["long_term", "medium_term", "short_term"]:
raise ValueError(
"time_range must be 'long_term', 'medium_term', or 'short_term'"
)
# Convert limit to int if it's a string
try:
limit = int(limit)
except (TypeError, ValueError):
raise ValueError("limit must be a valid integer")
if not 1 <= limit <= 50:
raise ValueError("limit must be between 1 and 50")
try:
self.logger.info(
f"Getting user's top {item_type} for {time_range} with limit {limit}"
)
results = (
self.sp.current_user_top_artists(
limit=limit, offset=0, time_range=time_range
)
if item_type == "artists"
else self.sp.current_user_top_tracks(
limit=limit, offset=0, time_range=time_range
)
)
self.logger.info(
f"Retrieved {len(results.get('items', []))} top {item_type}"
)
return results
except Exception as e:
self.logger.error(f"Error getting top {item_type}: {str(e)}")
raise
def get_info(self, item_uri: str) -> dict:
"""
Returns more info about item.
- item_uri: uri. Looks like 'spotify:track:xxxxxx', 'spotify:album:xxxxxx', etc.
"""
_, qtype, item_id = item_uri.split(":")
match qtype:
case "track":
return utils.parse_track(self.sp.track(item_id), detailed=True)
case "album":
album_info = utils.parse_album(self.sp.album(item_id), detailed=True)
return album_info
case "artist":
artist_info = utils.parse_artist(self.sp.artist(item_id), detailed=True)
albums = self.sp.artist_albums(item_id)
top_tracks = self.sp.artist_top_tracks(item_id)["tracks"]
albums_and_tracks = {"albums": albums, "tracks": {"items": top_tracks}}
parsed_info = utils.parse_search_results(
albums_and_tracks, qtype="album,track"
)
artist_info["top_tracks"] = parsed_info["tracks"]
artist_info["albums"] = parsed_info["albums"]
return artist_info
case "playlist":
playlist = self.sp.playlist(item_id)
self.logger.info(f"playlist info is {playlist}")
playlist_info = utils.parse_playlist(
playlist, self.username, detailed=True
)
return playlist_info
raise ValueError(f"Unknown qtype {qtype}")
def get_current_track(self) -> Optional[Dict]:
"""Get information about the currently playing track"""
try:
# current_playback vs current_user_playing_track?
current = self.sp.current_user_playing_track()
if not current:
self.logger.info("No playback session found")
return None
if current.get("currently_playing_type") != "track":
self.logger.info("Current playback is not a track")
return None
track_info = utils.parse_track(current["item"])
if "is_playing" in current:
track_info["is_playing"] = current["is_playing"]
self.logger.info(
f"Current track: {track_info.get('name', 'Unknown')} by {track_info.get('artist', 'Unknown')}"
)
return track_info
except Exception as e:
self.logger.error("Error getting current track info")
raise
@utils.validate
def start_playback(self, spotify_uri=None, device=None):
"""
Starts spotify playback of uri. If spotify_uri is omitted, resumes current playback.
- spotify_uri: ID of resource to play, or None. Typically looks like 'spotify:track:xxxxxx' or 'spotify:album:xxxxxx'.
"""
try:
self.logger.info(
f"Starting playback for spotify_uri: {spotify_uri} on {device}"
)
if not spotify_uri:
if self.is_track_playing():
self.logger.info(
"No track_id provided and playback already active."
)
return
if not self.get_current_track():
raise ValueError(
"No track_id provided and no current playback to resume."
)
if spotify_uri is not None:
if spotify_uri.startswith("spotify:track:"):
uris = [spotify_uri]
context_uri = None
else:
uris = None
context_uri = spotify_uri
else:
uris = None
context_uri = None
device_id = device.get("id") if device else None
self.logger.info(
f"Starting playback of on {device}: context_uri={context_uri}, uris={uris}"
)
result = self.sp.start_playback(
uris=uris, context_uri=context_uri, device_id=device_id
)
self.logger.info(f"Playback result: {result}")
return result
except Exception as e:
self.logger.error(f"Error starting playback: {str(e)}")
raise
@utils.validate
def pause_playback(self, device=None):
"""Pauses playback."""
playback = self.sp.current_playback()
if playback and playback.get("is_playing"):
self.sp.pause_playback(device.get("id") if device else None)
@utils.validate
def add_to_queue(self, track_id: str, device=None):
"""
Adds track to queue.
- track_id: ID of track to play.
"""
self.sp.add_to_queue(track_id, device.get("id") if device else None)
@utils.validate
def get_queue(self, device=None):
"""Returns the current queue of tracks."""
queue_info = self.sp.queue()
self.logger.info(
f"currently playing keys {queue_info['currently_playing'].keys()}"
)
queue_info["currently_playing"] = self.get_current_track()
queue_info["queue"] = [
utils.parse_track(track) for track in queue_info.pop("queue")
]
return queue_info
def get_liked_songs(self):
# todo
results = self.sp.current_user_saved_tracks()
for idx, item in enumerate(results["items"]):
track = item["track"]
print(idx, track["artists"][0]["name"], " – ", track["name"])
def is_track_playing(self) -> bool:
"""Returns if a track is actively playing."""
curr_track = self.get_current_track()
if not curr_track:
return False
if curr_track.get("is_playing"):
return True
return False
def get_devices(self) -> dict:
return self.sp.devices()["devices"]
def is_active_device(self):
return any([device.get("is_active") for device in self.get_devices()])
def _get_candidate_device(self):
devices = self.get_devices()
for device in devices:
if device.get("is_active"):
return device
self.logger.info(f"No active device, assigning {devices[0]['name']}.")
return devices[0]
def auth_ok(self) -> bool:
try:
token = self.cache_handler.get_cached_token()
if token is None:
self.logger.info("Auth check result: no token exists")
return False
is_expired = self.auth_manager.is_token_expired(token)
self.logger.info(
f"Auth check result: {'valid' if not is_expired else 'expired'}"
)
return not is_expired # Return True if token is NOT expired
except Exception as e:
self.logger.error(f"Error checking auth status: {str(e)}")
return False # Return False on error rather than raising
def auth_refresh(self):
self.auth_manager.validate_token(self.cache_handler.get_cached_token())
def skip_track(self, n=1):
# todo: Better error handling
for _ in range(n):
self.sp.next_track()
def previous_track(self):
self.sp.previous_track()
def seek_to_position(self, position_ms):
self.sp.seek_track(position_ms=position_ms)
def set_volume(self, volume_percent):
self.sp.volume(volume_percent)
def get_track_uri_from_title(self, track_title, limit=1):
"""
Recherche une chanson par son titre et retourne son URI Spotify
Parameters:
- track_title: le titre de la chanson à rechercher
- limit: nombre de résultats (par défaut 1 pour obtenir le premier match)
Returns:
- URI de la chanson ou None si rien n'est trouvé
"""
try:
results = self.sp.search(q=track_title, type="track", limit=limit)
if results["tracks"]["items"]:
return results["tracks"]["items"][0]["uri"]
return None
except Exception as e:
self.logger.error(f"Erreur lors de la recherche du titre: {str(e)}")
return None
```
--------------------------------------------------------------------------------
/src/spotify_mcp/server.py:
--------------------------------------------------------------------------------
```python
import sys
import json
import traceback
from typing import Optional, Any
import mcp.types as types
from mcp.server import Server # , stdio_server
import mcp.server.stdio
from pydantic import BaseModel, Field
from spotipy import SpotifyException
from spotify_mcp import spotify_api
def setup_logger():
class Logger:
def __init__(self):
# Créer un fichier de log avec le chemin complet
import os
log_dir = os.path.dirname(os.path.abspath(__file__))
self.log_file = open(
os.path.join(log_dir, "spotify_mcp.log"), "a", encoding="utf-8"
)
def info(self, message):
log_message = f"[INFO] {message}"
print(log_message, file=self.log_file)
self.log_file.flush()
print(log_message) # Affiche aussi dans le terminal
def error(self, message):
log_message = f"[ERROR] {message}"
print(log_message, file=self.log_file)
self.log_file.flush()
print(log_message)
def debug(self, message):
log_message = f"[DEBUG] {message}"
print(log_message, file=self.log_file)
self.log_file.flush()
print(log_message)
def trace(self, message, obj=None):
log_message = f"[TRACE] {message}"
print(log_message, file=self.log_file)
if obj:
print(f"[TRACE] Object: {repr(obj)}", file=self.log_file)
self.log_file.flush()
print(log_message)
if obj:
print(f"[TRACE] Object: {repr(obj)}")
def exception(self, message):
log_message = f"[EXCEPTION] {message}\n{traceback.format_exc()}"
print(log_message, file=self.log_file)
self.log_file.flush()
print(log_message)
def __del__(self):
# Fermer le fichier de log quand l'objet est détruit
if hasattr(self, "log_file"):
self.log_file.close()
return Logger()
def debug_object(obj: Any, name: str = "Object") -> str:
"""Helper function to debug print objects"""
if obj is None:
return f"{name}: None"
try:
return f"{name} ({type(obj).__name__}): {repr(obj)}"
except Exception as e:
return f"{name}: <Error getting representation: {str(e)}>"
server = Server("spotify-mcp")
options = server.create_initialization_options()
global_logger = setup_logger()
# Debug log startup information
global_logger.debug(
f"Server initialized with options: {debug_object(options, 'options')}"
)
global_logger.debug(f"Python version: {sys.version}")
global_logger.debug(f"Arguments: {debug_object(sys.argv, 'sys.argv')}")
try:
global_logger.debug("Initializing Spotify client")
spotify_client = spotify_api.Client(global_logger)
global_logger.debug("Spotify client initialized successfully")
except Exception as e:
global_logger.exception(f"Failed to initialize Spotify client: {str(e)}")
raise
class ToolModel(BaseModel):
@classmethod
def as_tool(cls):
return types.Tool(
name="Spotify" + cls.__name__,
description=cls.__doc__,
inputSchema=cls.model_json_schema(),
)
class Play(ToolModel):
action: str = Field(
description="Action to perform: 'get', 'start', 'pause' or 'skip'."
)
spotify_uri: Optional[str] = Field(
default=None,
description="Spotify uri of item to play for 'start' action. "
+ "If omitted, resumes current playback.",
)
num_skips: Optional[int] = Field(
default=1, description="Number of tracks to skip for `skip` action."
)
class Queue(ToolModel):
"""Manage the playback queue - get the queue or add tracks."""
action: str = Field(description="Action to perform: 'add' or 'get'.")
track_id: Optional[str] = Field(
default=None, description="Track ID to add to queue (required for add action)"
)
class Info(ToolModel):
"""Get information about an item (track, album, artist, or playlist)."""
item_uri: str = Field(
description="URI of the item to get information about. "
+ "If 'playlist' or 'album', returns its tracks. "
+ "If 'artist', returns albums and top tracks."
)
# qtype: str = Field(default="track", description="Type of item: 'track', 'album', 'artist', or 'playlist'. "
# )
class Search(ToolModel):
"""Search for tracks, albums, artists, or playlists on Spotify."""
query: str = Field(description="query term")
qtype: Optional[str] = Field(
default="track",
description="Type of items to search for (track, album, artist, playlist, "
+ "or comma-separated combination)",
)
limit: Optional[int] = Field(
default=10, description="Maximum number of items to return"
)
# Nouvelle classe pour l'historique des artistes les plus écoutés
class TopItems(ToolModel):
"""Get the user's top artists or tracks based on calculated affinity."""
item_type: str = Field(
description="Type of items to retrieve ('artists' or 'tracks')"
)
time_range: Optional[str] = Field(
default="long_term",
description="Time period over which to retrieve top items: 'long_term' (~ 1 year), 'medium_term' (~ 6 months), or 'short_term' (~ 4 weeks)",
)
limit: Optional[int] = Field(
default=10, description="Number of items to retrieve (max 50)"
)
class PlaylistCreator(ToolModel):
"""Création et gestion des playlists Spotify"""
action: str = Field(description="Action : 'create', 'search_and_add'") # Simplifié
playlist_details: Optional[dict] = Field(
description={
"name": "Nom de la playlist",
"description": "Description de la playlist",
"public": "Visibilité (true/false)",
"collaborative": "Playlist collaborative (true/false)",
}
)
playlist_id: Optional[str] = Field(
description="ID de la playlist (requis pour search_and_add)"
)
search_query: Optional[str] = Field(description="Recherche de titres à ajouter")
limit: Optional[int] = Field(
default=10, description="Nombre maximum de résultats de recherche"
)
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
return []
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return []
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools."""
global_logger.info("Listing available tools")
global_logger.debug("handle_list_tools called")
# await server.request_context.session.send_notification("are you recieving this notification?")
tools = [
Play.as_tool(),
Search.as_tool(),
Queue.as_tool(),
Info.as_tool(),
TopItems.as_tool(),
PlaylistCreator.as_tool(),
]
global_logger.info(f"Available tools: {[tool.name for tool in tools]}")
global_logger.debug(f"Returning {len(tools)} tools")
return tools
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution requests."""
global_logger.info(f"Tool called: {name} with arguments: {arguments}")
assert name[:7] == "Spotify", f"Unknown tool: {name}"
try:
match name[7:]:
case "Play":
action = arguments.get("action")
match action:
case "get":
global_logger.info("Attempting to get current track")
curr_track = spotify_client.get_current_track()
if curr_track:
global_logger.info(
f"Current track retrieved: {curr_track.get('name', 'Unknown')}"
)
return [
types.TextContent(
type="text", text=json.dumps(curr_track, indent=2)
)
]
global_logger.info("No track currently playing")
return [
types.TextContent(type="text", text="No track playing.")
]
case "start":
global_logger.info(
f"Starting playback with arguments: {arguments}"
)
spotify_client.start_playback(
spotify_uri=arguments.get("spotify_uri")
)
global_logger.info("Playback started successfully")
return [
types.TextContent(type="text", text="Playback starting.")
]
case "pause":
global_logger.info("Attempting to pause playback")
spotify_client.pause_playback()
global_logger.info("Playback paused successfully")
return [types.TextContent(type="text", text="Playback paused.")]
case "skip":
num_skips = int(arguments.get("num_skips", 1))
global_logger.info(f"Skipping {num_skips} tracks.")
spotify_client.skip_track(n=num_skips)
return [
types.TextContent(
type="text", text="Skipped to next track."
)
]
case "Search":
global_logger.info(f"Performing search with arguments: {arguments}")
search_results = spotify_client.search(
query=arguments.get("query", ""),
qtype=arguments.get("qtype", "track"),
limit=arguments.get("limit", 10),
)
global_logger.info("Search completed successfully.")
return [
types.TextContent(
type="text", text=json.dumps(search_results, indent=2)
)
]
case "Queue":
global_logger.info(f"Queue operation with arguments: {arguments}")
action = arguments.get("action")
match action:
case "add":
track_id = arguments.get("track_id")
if not track_id:
global_logger.error(
"track_id is required for add to queue."
)
return [
types.TextContent(
type="text",
text="track_id is required for add action",
)
]
spotify_client.add_to_queue(track_id)
return [
types.TextContent(
type="text", text=f"Track added to queue."
)
]
case "get":
queue = spotify_client.get_queue()
return [
types.TextContent(
type="text", text=json.dumps(queue, indent=2)
)
]
case _:
return [
types.TextContent(
type="text",
text=f"Unknown queue action: {action}. Supported actions are: add, remove, and get.",
)
]
case "Info":
global_logger.info(f"Getting item info with arguments: {arguments}")
item_info = spotify_client.get_info(item_uri=arguments.get("item_uri"))
return [
types.TextContent(type="text", text=json.dumps(item_info, indent=2))
]
case "TopItems":
global_logger.info(f"Getting top items with arguments: {arguments}")
item_type = arguments.get("item_type", "artists")
time_range = arguments.get("time_range", "long_term")
limit = arguments.get("limit", 10)
top_items = spotify_client.get_top_items(
item_type=item_type, time_range=time_range, limit=limit
)
return [
types.TextContent(type="text", text=json.dumps(top_items, indent=2))
]
case "PlaylistCreator":
global_logger.info(
f"Handling playlist operation with arguments: {arguments}"
)
action = arguments.get("action")
match action:
case "create":
global_logger.info("Creating a new playlist")
details = arguments.get("playlist_details", {})
# Si details est une chaîne JSON, la convertir en dictionnaire
if isinstance(details, str):
try:
details = json.loads(details)
except json.JSONDecodeError as e:
raise ValueError(
f"Format invalide pour playlist_details: {e}"
)
if "name" not in details:
raise ValueError("Le nom de la playlist est requis")
# Récupérer l'ID de l'utilisateur courant
user_id = spotify_client.sp.current_user()["id"]
# Créer la playlist en utilisant la méthode correcte de spotipy
new_playlist = spotify_client.sp.user_playlist_create(
user=user_id,
name=details.get("name"),
public=details.get("public", True),
collaborative=details.get("collaborative", False),
description=details.get("description", ""),
)
return [
types.TextContent(
type="text",
text=f"Playlist créée avec succès! ID: {new_playlist['id']}",
)
]
case "search_and_add":
global_logger.info("Searching tracks and adding to playlist")
playlist_id = arguments.get("playlist_id")
search_query = arguments.get("search_query")
limit = arguments.get("limit", 10)
global_logger.info(
f"Arguments reçus: {json.dumps(arguments, indent=2)}"
)
# Vérifier si l'ID de playlist est un nom plutôt qu'un ID
try:
# Rechercher d'abord la playlist par son nom si ce n'est pas un ID valide
if (
not playlist_id.startswith("spotify:playlist:")
and not len(playlist_id) == 22
):
playlists = spotify_client.sp.current_user_playlists()
for playlist in playlists["items"]:
if playlist["name"] == playlist_id:
playlist_id = playlist["id"]
global_logger.info(
f"Playlist trouvée par nom, ID: {playlist_id}"
)
break
else:
raise ValueError(
f"Playlist non trouvée : {playlist_id}"
)
# Recherche du titre
global_logger.info(f"Recherche du titre : {search_query}")
sp_results = spotify_client.sp.search(
q=search_query,
type="track",
limit=1,
market="FR", # Ajout du marché pour de meilleurs résultats
)
global_logger.info(
f"Résultats de recherche reçus: {bool(sp_results)}"
)
global_logger.debug(
f"Résultats détaillés: {json.dumps(sp_results, indent=2)}"
)
if not sp_results or not sp_results.get("tracks", {}).get(
"items"
):
raise ValueError(
f"Aucun titre trouvé pour : {search_query}"
)
track = sp_results["tracks"]["items"][0]
track_uri = track["uri"]
global_logger.info(
f"Titre trouvé : {track['name']} ({track_uri})"
)
# Ajouter le titre à la playlist
add_result = spotify_client.sp.playlist_add_items(
playlist_id=playlist_id, items=[track_uri]
)
global_logger.info(f"Résultat de l'ajout : {add_result}")
return [
types.TextContent(
type="text",
text=json.dumps(
{
"message": "Titre ajouté avec succès !",
"track": {
"name": track["name"],
"artist": track["artists"][0]["name"],
"uri": track_uri,
},
},
indent=2,
),
)
]
except Exception as e:
error_details = (
f"Erreur détaillée : {str(e)}\n{traceback.format_exc()}"
)
global_logger.error(error_details)
return [
types.TextContent(
type="text",
text=f"Erreur lors de l'opération : {str(e)}",
)
]
case _:
error_msg = f"Action inconnue: {action}. Actions supportées: create, search_and_add"
global_logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
case _:
error_msg = f"Unknown tool: {name}"
global_logger.error(error_msg)
raise ValueError(error_msg)
except SpotifyException as se:
error_msg = f"Spotify Client error occurred: {str(se)}"
global_logger.error(error_msg)
return [
types.TextContent(
type="text",
text=f"An error occurred with the Spotify Client: {str(se)}",
)
]
except Exception as e:
error_msg = f"Unexpected error occurred: {str(e)}"
global_logger.error(error_msg)
raise
async def main():
global_logger.debug("====== main() function started ======")
try:
global_logger.debug("Initializing stdio server")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
global_logger.debug(
f"stdio server initialized: read_stream={debug_object(read_stream, 'read_stream')}, write_stream={debug_object(write_stream, 'write_stream')}"
)
try:
global_logger.debug("About to call server.run()")
await server.run(read_stream, write_stream, options)
global_logger.debug("server.run() completed normally")
except Exception as e:
global_logger.exception(f"Error in server.run(): {str(e)}")
raise
global_logger.debug("stdio server context exited")
except Exception as e:
global_logger.exception(f"Error in main(): {str(e)}")
raise
finally:
global_logger.debug("====== main() function exiting ======")
if __name__ == "__main__":
global_logger.debug("Module executed directly")
import asyncio
global_logger.debug("Starting asyncio.run(main())")
try:
asyncio.run(main())
global_logger.debug("asyncio.run(main()) completed successfully")
except Exception as e:
global_logger.exception(f"Uncaught exception in asyncio.run(main()): {str(e)}")
sys.exit(1)
global_logger.debug("Script exiting")
```