#
tokens: 12467/50000 8/8 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE
├── media
│   └── take5.mp4
├── pyproject.toml
├── README.md
├── src
│   └── spotify_mcp
│       ├── __init__.py
│       ├── server.py
│       ├── spotify_api.py
│       └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
1 | .cache
2 | .DS_Store
3 | __pycache__/
4 | .env
5 | .idea/
6 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # spotify-mcp MCP server
  2 | 
  3 | MCP project to connect Claude with Spotify. Built on top of [spotipy-dev's API](https://github.com/spotipy-dev/spotipy/tree/2.24.0).
  4 | 
  5 | ## Features
  6 | 
  7 | - Start, pause, and skip playback
  8 | - Search for tracks/albums/artists/playlists
  9 | - Get info about a track/album/artist/playlist
 10 | - Manage the Spotify queue
 11 | - Manage, create, and update playlists
 12 | 
 13 | ## Demo
 14 | 
 15 | <details>
 16 |   <summary>
 17 |     Video -- turn on audio
 18 |   </summary>
 19 |   https://github.com/user-attachments/assets/20ee1f92-f3e3-4dfa-b945-ca57bc1e0894
 20 | </details>
 21 | 
 22 | ## Configuration
 23 | 
 24 | ### Getting Spotify API Keys
 25 | 
 26 | Create an account on [developer.spotify.com](https://developer.spotify.com/). Navigate to [the dashboard](https://developer.spotify.com/dashboard). 
 27 | Create an app with redirect_uri as http://127.0.0.1:8080/callback. 
 28 | You can choose any port you want but you must use http and an explicit loopback address (IPv4 or IPv6).
 29 | 
 30 | See [here](https://developer.spotify.com/documentation/web-api/concepts/redirect_uri) for more info/troubleshooting. 
 31 | You may have to restart your MCP environment (e.g. Claude Desktop) once or twice before it works.
 32 | 
 33 | ### Locating MCP Config
 34 | 
 35 | For Cursor, Claude Desktop, or any other MCP-enabled client you will have to locate your config.
 36 | 
 37 | - Claude Desktop location on MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
 38 | 
 39 | - Claude Desktop location on Windows: `%APPDATA%/Claude/claude_desktop_config.json`
 40 | 
 41 | 
 42 | ### Run this project with uvx
 43 | 
 44 | Add this snippet to your MCP Config.
 45 | 
 46 | ```json
 47 | {
 48 |   "mcpServers": {
 49 |     "spotify": {
 50 |       "command": "uvx",
 51 |       "args": [
 52 |         "--python", "3.12",
 53 |         "--from", "git+https://github.com/varunneal/spotify-mcp",
 54 |         "spotify-mcp"
 55 |       ],
 56 |       "env": {
 57 |         "SPOTIFY_CLIENT_ID": YOUR_CLIENT_ID,
 58 |         "SPOTIFY_CLIENT_SECRET": YOUR_CLIENT_SECRET,
 59 |         "SPOTIFY_REDIRECT_URI": "http://127.0.0.1:8080/callback"
 60 |       }
 61 |     }
 62 |   }
 63 | }
 64 | ```
 65 | 
 66 | ### Run this project locally
 67 | 
 68 | Using UVX will open the spotify redirect URI for every tool call. To avoid this, you can run this project locally by cloning this repo:
 69 | 
 70 | ```bash
 71 | git clone https://github.com/varunneal/spotify-mcp.git
 72 | ```
 73 | 
 74 | Add it to your MCP Config like this:
 75 | 
 76 |   ```json
 77 |   "spotify": {
 78 |       "command": "uv",
 79 |       "args": [
 80 |         "--directory",
 81 |         "/path/to/spotify-mcp",
 82 |         "run",
 83 |         "spotify-mcp"
 84 |       ],
 85 |       "env": {
 86 |         "SPOTIFY_CLIENT_ID": YOUR_CLIENT_ID,
 87 |         "SPOTIFY_CLIENT_SECRET": YOUR_CLIENT_SECRET,
 88 |         "SPOTIFY_REDIRECT_URI": "http://127.0.0.1:8080/callback"
 89 |       }
 90 |     }
 91 |   ```
 92 | 
 93 | ### Troubleshooting
 94 | 
 95 | Please open an issue if you can't get this MCP working. Here are some tips:
 96 | 
 97 | 1. Make sure `uv` is updated. I recommend version `>=0.54`.
 98 | 2. If cloning locally, enable execution permisisons for the project: `chmod -R 755`.
 99 | 3. Ensure you have Spotify premium (needed for running developer API). 
100 | 
101 | This MCP will emit logs to std err (as specified in the MCP) spec. On Mac the Claude Desktop app should emit these logs
102 | to `~/Library/Logs/Claude`. 
103 | On other platforms [you can find logs here](https://modelcontextprotocol.io/quickstart/user#getting-logs-from-claude-for-desktop).
104 | 
105 | 
106 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
107 | 
108 | ```bash
109 | npx @modelcontextprotocol/inspector uv --directory /path/to/spotify-mcp run spotify-mcp
110 | ```
111 | 
112 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
113 | 
114 | ## TODO
115 | 
116 | Unfortunately, a bunch of cool features have [now been deprecated](https://techcrunch.com/2024/11/27/spotify-cuts-developer-access-to-several-of-its-recommendation-features/)
117 | from the Spotify API. Most new features will be relatively minor or for the health of the project:
118 | 
119 | - tests.
120 | - ~~adding API support for managing playlists.~~
121 | - adding API support for paginated search results/playlists/albums.
122 | 
123 | PRs appreciated! Thanks to @jamiew, @davidpadbury, @manncodes, @hyuma7, @aanurraj, @JJGO and others for contributions.  
124 | 
125 | [//]: # (## Deployment)
126 | 
127 | [//]: # (&#40;todo&#41;)
128 | 
129 | [//]: # (### Building and Publishing)
130 | 
131 | [//]: # ()
132 | [//]: # (To prepare the package for distribution:)
133 | 
134 | [//]: # ()
135 | [//]: # (1. Sync dependencies and update lockfile:)
136 | 
137 | [//]: # ()
138 | [//]: # (```bash)
139 | 
140 | [//]: # (uv sync)
141 | 
142 | [//]: # (```)
143 | 
144 | [//]: # ()
145 | [//]: # (2. Build package distributions:)
146 | 
147 | [//]: # ()
148 | [//]: # (```bash)
149 | 
150 | [//]: # (uv build)
151 | 
152 | [//]: # (```)
153 | 
154 | [//]: # ()
155 | [//]: # (This will create source and wheel distributions in the `dist/` directory.)
156 | 
157 | [//]: # ()
158 | [//]: # (3. Publish to PyPI:)
159 | 
160 | [//]: # ()
161 | [//]: # (```bash)
162 | 
163 | [//]: # (uv publish)
164 | 
165 | [//]: # (```)
166 | 
167 | [//]: # ()
168 | [//]: # (Note: You'll need to set PyPI credentials via environment variables or command flags:)
169 | 
170 | [//]: # ()
171 | [//]: # (- Token: `--token` or `UV_PUBLISH_TOKEN`)
172 | 
173 | [//]: # (- Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`)
174 | 
```

--------------------------------------------------------------------------------
/src/spotify_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | from . import server
 2 | import asyncio
 3 | 
 4 | def main():
 5 |     """Main entry point for the package."""
 6 |     asyncio.run(server.main())
 7 | 
 8 | # Optionally expose other important items at package level
 9 | __all__ = ['main', 'server']
10 | 
11 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "spotify-mcp"
 3 | version = "0.2.0"
 4 | description = "MCP spotify project"
 5 | readme = "README.md"
 6 | requires-python = ">=3.12"
 7 | dependencies = [
 8 |  "mcp==1.3.0",
 9 |  "python-dotenv>=1.0.1",
10 |  "spotipy==2.24.0",
11 | ]
12 | [[project.authors]]
13 | name = "Varun Srivastava"
14 | email = "[email protected]"
15 | 
16 | [build-system]
17 | requires = [ "hatchling",]
18 | build-backend = "hatchling.build"
19 | 
20 | [dependency-groups]
21 | dev = [
22 | ]
23 | 
24 | [tool.uv.sources]
25 | spotify-mcp = { workspace = true }
26 | 
27 | [project.scripts]
28 | spotify-mcp = "spotify_mcp:main"
29 | 
```

--------------------------------------------------------------------------------
/src/spotify_mcp/utils.py:
--------------------------------------------------------------------------------

```python
  1 | from collections import defaultdict
  2 | from typing import Optional, Dict
  3 | import functools
  4 | from typing import Callable, TypeVar
  5 | from typing import Optional, Dict
  6 | from urllib.parse import quote, urlparse, urlunparse
  7 | 
  8 | from requests import RequestException
  9 | 
 10 | T = TypeVar('T')
 11 | 
 12 | 
 13 | def normalize_redirect_uri(url: str) -> str:
 14 |     if not url:
 15 |         return url
 16 |         
 17 |     parsed = urlparse(url)
 18 |     
 19 |     # Convert localhost to 127.0.0.1
 20 |     if parsed.netloc == 'localhost' or parsed.netloc.startswith('localhost:'):
 21 |         port = ''
 22 |         if ':' in parsed.netloc:
 23 |             port = ':' + parsed.netloc.split(':')[1]
 24 |         parsed = parsed._replace(netloc=f'127.0.0.1{port}')
 25 |     
 26 |     return urlunparse(parsed)
 27 | 
 28 | def parse_track(track_item: dict, detailed=False) -> Optional[dict]:
 29 |     if not track_item:
 30 |         return None
 31 |     narrowed_item = {
 32 |         'name': track_item['name'],
 33 |         'id': track_item['id'],
 34 |     }
 35 | 
 36 |     if 'is_playing' in track_item:
 37 |         narrowed_item['is_playing'] = track_item['is_playing']
 38 | 
 39 |     if detailed:
 40 |         narrowed_item['album'] = parse_album(track_item.get('album'))
 41 |         for k in ['track_number', 'duration_ms']:
 42 |             narrowed_item[k] = track_item.get(k)
 43 | 
 44 |     if not track_item.get('is_playable', True):
 45 |         narrowed_item['is_playable'] = False
 46 | 
 47 |     artists = [a['name'] for a in track_item['artists']]
 48 |     if detailed:
 49 |         artists = [parse_artist(a) for a in track_item['artists']]
 50 | 
 51 |     if len(artists) == 1:
 52 |         narrowed_item['artist'] = artists[0]
 53 |     else:
 54 |         narrowed_item['artists'] = artists
 55 | 
 56 |     return narrowed_item
 57 | 
 58 | 
 59 | def parse_artist(artist_item: dict, detailed=False) -> Optional[dict]:
 60 |     if not artist_item:
 61 |         return None
 62 |     narrowed_item = {
 63 |         'name': artist_item['name'],
 64 |         'id': artist_item['id'],
 65 |     }
 66 |     if detailed:
 67 |         narrowed_item['genres'] = artist_item.get('genres')
 68 | 
 69 |     return narrowed_item
 70 | 
 71 | 
 72 | def parse_playlist(playlist_item: dict, username, detailed=False) -> Optional[dict]:
 73 |     if not playlist_item:
 74 |         return None
 75 |     narrowed_item = {
 76 |         'name': playlist_item['name'],
 77 |         'id': playlist_item['id'],
 78 |         'owner': playlist_item['owner']['display_name'],
 79 |         'user_is_owner': playlist_item['owner']['display_name'] == username,
 80 |         'total_tracks': playlist_item['tracks']['total'],
 81 |     }
 82 |     if detailed:
 83 |         narrowed_item['description'] = playlist_item.get('description')
 84 |         tracks = []
 85 |         for t in playlist_item['tracks']['items']:
 86 |             tracks.append(parse_track(t['track']))
 87 |         narrowed_item['tracks'] = tracks
 88 | 
 89 |     return narrowed_item
 90 | 
 91 | 
 92 | def parse_album(album_item: dict, detailed=False) -> dict:
 93 |     narrowed_item = {
 94 |         'name': album_item['name'],
 95 |         'id': album_item['id'],
 96 |     }
 97 | 
 98 |     artists = [a['name'] for a in album_item['artists']]
 99 | 
100 |     if detailed:
101 |         tracks = []
102 |         for t in album_item['tracks']['items']:
103 |             tracks.append(parse_track(t))
104 |         narrowed_item["tracks"] = tracks
105 |         artists = [parse_artist(a) for a in album_item['artists']]
106 | 
107 |         for k in ['total_tracks', 'release_date', 'genres']:
108 |             narrowed_item[k] = album_item.get(k)
109 | 
110 |     if len(artists) == 1:
111 |         narrowed_item['artist'] = artists[0]
112 |     else:
113 |         narrowed_item['artists'] = artists
114 | 
115 |     return narrowed_item
116 | 
117 | 
118 | def parse_search_results(results: Dict, qtype: str, username: Optional[str] = None):
119 |     _results = defaultdict(list)
120 |     # potential
121 |     # if username:
122 |     #     _results['User Spotify URI'] = username
123 | 
124 |     for q in qtype.split(","):
125 |         match q:
126 |             case "track":
127 |                 for idx, item in enumerate(results['tracks']['items']):
128 |                     if not item: continue
129 |                     _results['tracks'].append(parse_track(item))
130 |             case "artist":
131 |                 for idx, item in enumerate(results['artists']['items']):
132 |                     if not item: continue
133 |                     _results['artists'].append(parse_artist(item))
134 |             case "playlist":
135 |                 for idx, item in enumerate(results['playlists']['items']):
136 |                     if not item: continue
137 |                     _results['playlists'].append(parse_playlist(item, username))
138 |             case "album":
139 |                 for idx, item in enumerate(results['albums']['items']):
140 |                     if not item: continue
141 |                     _results['albums'].append(parse_album(item))
142 |             case _:
143 |                 raise ValueError(f"Unknown qtype {qtype}")
144 | 
145 |     return dict(_results)
146 | 
147 | def parse_tracks(items: Dict) -> list:
148 |     """
149 |     Parse a list of track items and return a list of parsed tracks.
150 | 
151 |     Args:
152 |         items: List of track items
153 |     Returns:
154 |         List of parsed tracks
155 |     """ 
156 |     tracks = []
157 |     for idx, item in enumerate(items):
158 |         if not item:
159 |             continue
160 |         tracks.append(parse_track(item['track']))
161 |     return tracks
162 | 
163 | 
164 | def build_search_query(base_query: str,
165 |                        artist: Optional[str] = None,
166 |                        track: Optional[str] = None,
167 |                        album: Optional[str] = None,
168 |                        year: Optional[str] = None,
169 |                        year_range: Optional[tuple[int, int]] = None,
170 |                        # upc: Optional[str] = None,
171 |                        # isrc: Optional[str] = None,
172 |                        genre: Optional[str] = None,
173 |                        is_hipster: bool = False,
174 |                        is_new: bool = False
175 |                        ) -> str:
176 |     """
177 |     Build a search query string with optional filters.
178 | 
179 |     Args:
180 |         base_query: Base search term
181 |         artist: Artist name filter
182 |         track: Track name filter
183 |         album: Album name filter
184 |         year: Specific year filter
185 |         year_range: Tuple of (start_year, end_year) for year range filter
186 |         genre: Genre filter
187 |         is_hipster: Filter for lowest 10% popularity albums
188 |         is_new: Filter for albums released in past two weeks
189 | 
190 |     Returns:
191 |         Encoded query string with applied filters
192 |     """
193 |     filters = []
194 | 
195 |     if artist:
196 |         filters.append(f"artist:{artist}")
197 |     if track:
198 |         filters.append(f"track:{track}")
199 |     if album:
200 |         filters.append(f"album:{album}")
201 |     if year:
202 |         filters.append(f"year:{year}")
203 |     if year_range:
204 |         filters.append(f"year:{year_range[0]}-{year_range[1]}")
205 |     if genre:
206 |         filters.append(f"genre:{genre}")
207 |     if is_hipster:
208 |         filters.append("tag:hipster")
209 |     if is_new:
210 |         filters.append("tag:new")
211 | 
212 |     query_parts = [base_query] + filters
213 |     return quote(" ".join(query_parts))
214 | 
215 | 
216 | def validate(func: Callable[..., T]) -> Callable[..., T]:
217 |     """
218 |     Decorator for Spotify API methods that handles authentication and device validation.
219 |     - Checks and refreshes authentication if needed
220 |     - Validates active device and retries with candidate device if needed
221 |     """
222 | 
223 |     @functools.wraps(func)
224 |     def wrapper(self, *args, **kwargs):
225 |         # Handle authentication
226 |         if not self.auth_ok():
227 |             self.auth_refresh()
228 | 
229 |         # Handle device validation
230 |         if not self.is_active_device():
231 |             kwargs['device'] = self._get_candidate_device()
232 | 
233 |         # TODO: try-except RequestException
234 |         return func(self, *args, **kwargs)
235 | 
236 |     return wrapper
237 | 
238 | def ensure_username(func):
239 |     """
240 |     Decorator to ensure that the username is set before calling the function.
241 |     """
242 |     @functools.wraps(func)
243 |     def wrapper(self, *args, **kwargs):
244 |         if self.username is None:
245 |             self.set_username()
246 |         return func(self, *args, **kwargs)
247 |     return wrapper
248 | 
```

--------------------------------------------------------------------------------
/src/spotify_mcp/spotify_api.py:
--------------------------------------------------------------------------------

```python
  1 | import logging
  2 | import os
  3 | from typing import Optional, Dict, List
  4 | 
  5 | import spotipy
  6 | from dotenv import load_dotenv
  7 | from spotipy.cache_handler import CacheFileHandler
  8 | from spotipy.oauth2 import SpotifyOAuth
  9 | 
 10 | from . import utils
 11 | 
 12 | load_dotenv()
 13 | 
 14 | CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
 15 | CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
 16 | REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI")
 17 | 
 18 | # Normalize the redirect URI to meet Spotify's requirements
 19 | if REDIRECT_URI:
 20 |     REDIRECT_URI = utils.normalize_redirect_uri(REDIRECT_URI)
 21 | 
 22 | SCOPES = ["user-read-currently-playing", "user-read-playback-state", "user-read-currently-playing",  # spotify connect
 23 |           "app-remote-control", "streaming",  # playback
 24 |           "playlist-read-private", "playlist-read-collaborative", "playlist-modify-private", "playlist-modify-public",
 25 |           # playlists
 26 |           "user-read-playback-position", "user-top-read", "user-read-recently-played",  # listening history
 27 |           "user-library-modify", "user-library-read",  # library
 28 |           ]
 29 | 
 30 | 
 31 | class Client:
 32 |     def __init__(self, logger: logging.Logger):
 33 |         """Initialize Spotify client with necessary permissions"""
 34 |         self.logger = logger
 35 | 
 36 |         scope = "user-library-read,user-read-playback-state,user-modify-playback-state,user-read-currently-playing,playlist-read-private,playlist-read-collaborative,playlist-modify-private,playlist-modify-public"
 37 | 
 38 |         try:
 39 |             self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
 40 |                 scope=scope,
 41 |                 client_id=CLIENT_ID,
 42 |                 client_secret=CLIENT_SECRET,
 43 |                 redirect_uri=REDIRECT_URI))
 44 | 
 45 |             self.auth_manager: SpotifyOAuth = self.sp.auth_manager
 46 |             self.cache_handler: CacheFileHandler = self.auth_manager.cache_handler
 47 |         except Exception as e:
 48 |             self.logger.error(f"Failed to initialize Spotify client: {str(e)}")
 49 |             raise
 50 | 
 51 |         self.username = None
 52 | 
 53 |     @utils.validate
 54 |     def set_username(self, device=None):
 55 |         self.username = self.sp.current_user()['display_name']
 56 | 
 57 |     @utils.validate
 58 |     def search(self, query: str, qtype: str = 'track', limit=10, device=None):
 59 |         """
 60 |         Searches based of query term.
 61 |         - query: query term
 62 |         - qtype: the types of items to return. One or more of 'artist', 'album',  'track', 'playlist'.
 63 |                  If multiple types are desired, pass in a comma separated string; e.g. 'track,album'
 64 |         - limit: max # items to return
 65 |         """
 66 |         if self.username is None:
 67 |             self.set_username()
 68 |         results = self.sp.search(q=query, limit=limit, type=qtype)
 69 |         if not results:
 70 |             raise ValueError("No search results found.")
 71 |         return utils.parse_search_results(results, qtype, self.username)
 72 | 
 73 |     def recommendations(self, artists: Optional[List] = None, tracks: Optional[List] = None, limit=20):
 74 |         # doesnt work
 75 |         recs = self.sp.recommendations(seed_artists=artists, seed_tracks=tracks, limit=limit)
 76 |         return recs
 77 | 
 78 |     def get_info(self, item_uri: str) -> dict:
 79 |         """
 80 |         Returns more info about item.
 81 |         - item_uri: uri. Looks like 'spotify:track:xxxxxx', 'spotify:album:xxxxxx', etc.
 82 |         """
 83 |         _, qtype, item_id = item_uri.split(":")
 84 |         match qtype:
 85 |             case 'track':
 86 |                 return utils.parse_track(self.sp.track(item_id), detailed=True)
 87 |             case 'album':
 88 |                 album_info = utils.parse_album(self.sp.album(item_id), detailed=True)
 89 |                 return album_info
 90 |             case 'artist':
 91 |                 artist_info = utils.parse_artist(self.sp.artist(item_id), detailed=True)
 92 |                 albums = self.sp.artist_albums(item_id)
 93 |                 top_tracks = self.sp.artist_top_tracks(item_id)['tracks']
 94 |                 albums_and_tracks = {
 95 |                     'albums': albums,
 96 |                     'tracks': {'items': top_tracks}
 97 |                 }
 98 |                 parsed_info = utils.parse_search_results(albums_and_tracks, qtype="album,track")
 99 |                 artist_info['top_tracks'] = parsed_info['tracks']
100 |                 artist_info['albums'] = parsed_info['albums']
101 | 
102 |                 return artist_info
103 |             case 'playlist':
104 |                 if self.username is None:
105 |                     self.set_username()
106 |                 playlist = self.sp.playlist(item_id)
107 |                 self.logger.info(f"playlist info is {playlist}")
108 |                 playlist_info = utils.parse_playlist(playlist, self.username, detailed=True)
109 | 
110 |                 return playlist_info
111 | 
112 |         raise ValueError(f"Unknown qtype {qtype}")
113 | 
114 |     def get_current_track(self) -> Optional[Dict]:
115 |         """Get information about the currently playing track"""
116 |         try:
117 |             # current_playback vs current_user_playing_track?
118 |             current = self.sp.current_user_playing_track()
119 |             if not current:
120 |                 self.logger.info("No playback session found")
121 |                 return None
122 |             if current.get('currently_playing_type') != 'track':
123 |                 self.logger.info("Current playback is not a track")
124 |                 return None
125 | 
126 |             track_info = utils.parse_track(current['item'])
127 |             if 'is_playing' in current:
128 |                 track_info['is_playing'] = current['is_playing']
129 | 
130 |             self.logger.info(
131 |                 f"Current track: {track_info.get('name', 'Unknown')} by {track_info.get('artist', 'Unknown')}")
132 |             return track_info
133 |         except Exception as e:
134 |             self.logger.error("Error getting current track info.")
135 |             raise
136 | 
137 |     @utils.validate
138 |     def start_playback(self, spotify_uri=None, device=None):
139 |         """
140 |         Starts spotify playback of uri. If spotify_uri is omitted, resumes current playback.
141 |         - spotify_uri: ID of resource to play, or None. Typically looks like 'spotify:track:xxxxxx' or 'spotify:album:xxxxxx'.
142 |         """
143 |         try:
144 |             self.logger.info(f"Starting playback for spotify_uri: {spotify_uri} on {device}")
145 |             if not spotify_uri:
146 |                 if self.is_track_playing():
147 |                     self.logger.info("No track_id provided and playback already active.")
148 |                     return
149 |                 if not self.get_current_track():
150 |                     raise ValueError("No track_id provided and no current playback to resume.")
151 | 
152 |             if spotify_uri is not None:
153 |                 if spotify_uri.startswith('spotify:track:'):
154 |                     uris = [spotify_uri]
155 |                     context_uri = None
156 |                 else:
157 |                     uris = None
158 |                     context_uri = spotify_uri
159 |             else:
160 |                 uris = None
161 |                 context_uri = None
162 | 
163 |             device_id = device.get('id') if device else None
164 | 
165 |             self.logger.info(f"Starting playback of on {device}: context_uri={context_uri}, uris={uris}")
166 |             result = self.sp.start_playback(uris=uris, context_uri=context_uri, device_id=device_id)
167 |             self.logger.info(f"Playback result: {result}")
168 |             return result
169 |         except Exception as e:
170 |             self.logger.error(f"Error starting playback: {str(e)}.")
171 |             raise
172 | 
173 |     @utils.validate
174 |     def pause_playback(self, device=None):
175 |         """Pauses playback."""
176 |         playback = self.sp.current_playback()
177 |         if playback and playback.get('is_playing'):
178 |             self.sp.pause_playback(device.get('id') if device else None)
179 | 
180 |     @utils.validate
181 |     def add_to_queue(self, track_id: str, device=None):
182 |         """
183 |         Adds track to queue.
184 |         - track_id: ID of track to play.
185 |         """
186 |         self.sp.add_to_queue(track_id, device.get('id') if device else None)
187 | 
188 |     @utils.validate
189 |     def get_queue(self, device=None):
190 |         """Returns the current queue of tracks."""
191 |         queue_info = self.sp.queue()
192 |         queue_info['currently_playing'] = self.get_current_track()
193 | 
194 |         queue_info['queue'] = [utils.parse_track(track) for track in queue_info.pop('queue')]
195 | 
196 |         return queue_info
197 | 
198 |     def get_liked_songs(self):
199 |         # todo
200 |         results = self.sp.current_user_saved_tracks()
201 |         for idx, item in enumerate(results['items']):
202 |             track = item['track']
203 |             print(idx, track['artists'][0]['name'], " – ", track['name'])
204 | 
205 |     def is_track_playing(self) -> bool:
206 |         """Returns if a track is actively playing."""
207 |         curr_track = self.get_current_track()
208 |         if not curr_track:
209 |             return False
210 |         if curr_track.get('is_playing'):
211 |             return True
212 |         return False
213 | 
214 |     def get_current_user_playlists(self, limit=50) -> List[Dict]:
215 |         """
216 |         Get current user's playlists.
217 |         - limit: Max number of playlists to return.
218 |         """
219 |         playlists = self.sp.current_user_playlists()
220 |         if not playlists:
221 |             raise ValueError("No playlists found.")
222 |         return [utils.parse_playlist(playlist, self.username) for playlist in playlists['items']]
223 |     
224 |     @utils.ensure_username
225 |     def get_playlist_tracks(self, playlist_id: str, limit=50) -> List[Dict]:
226 |         """
227 |         Get tracks from a playlist.
228 |         - playlist_id: ID of the playlist to get tracks from.
229 |         - limit: Max number of tracks to return.
230 |         """
231 |         playlist = self.sp.playlist(playlist_id)
232 |         if not playlist:
233 |             raise ValueError("No playlist found.")
234 |         return utils.parse_tracks(playlist['tracks']['items'])
235 |     
236 |     @utils.ensure_username
237 |     def add_tracks_to_playlist(self, playlist_id: str, track_ids: List[str], position: Optional[int] = None):
238 |         """
239 |         Add tracks to a playlist.
240 |         - playlist_id: ID of the playlist to modify.
241 |         - track_ids: List of track IDs to add.
242 |         - position: Position to insert the tracks at (optional).
243 |         """
244 |         if not playlist_id:
245 |             raise ValueError("No playlist ID provided.")
246 |         if not track_ids:
247 |             raise ValueError("No track IDs provided.")
248 |         
249 |         try:
250 |             response = self.sp.playlist_add_items(playlist_id, track_ids, position=position)
251 |             self.logger.info(f"Response from adding tracks: {track_ids} to playlist {playlist_id}: {response}")
252 |         except Exception as e:
253 |             self.logger.error(f"Error adding tracks to playlist: {str(e)}")
254 | 
255 |     @utils.ensure_username
256 |     def remove_tracks_from_playlist(self, playlist_id: str, track_ids: List[str]):
257 |         """
258 |         Remove tracks from a playlist.
259 |         - playlist_id: ID of the playlist to modify.
260 |         - track_ids: List of track IDs to remove.
261 |         """
262 |         if not playlist_id:
263 |             raise ValueError("No playlist ID provided.")
264 |         if not track_ids:
265 |             raise ValueError("No track IDs provided.")
266 |         
267 |         try:
268 |             response = self.sp.playlist_remove_all_occurrences_of_items(playlist_id, track_ids)
269 |             self.logger.info(f"Response from removing tracks: {track_ids} from playlist {playlist_id}: {response}")
270 |         except Exception as e:
271 |             self.logger.error(f"Error removing tracks from playlist: {str(e)}")
272 | 
273 |     @utils.ensure_username
274 |     def create_playlist(self, name: str, description: Optional[str] = None, public: bool = True):
275 |         """
276 |         Create a new playlist.
277 |         - name: Name for the playlist.
278 |         - description: Description for the playlist.
279 |         - public: Whether the playlist should be public.
280 |         """
281 |         if not name:
282 |             raise ValueError("Playlist name is required.")
283 |         
284 |         try:
285 |             user = self.sp.current_user()
286 |             user_id = user['id']
287 |             
288 |             playlist = self.sp.user_playlist_create(
289 |                 user=user_id,
290 |                 name=name,
291 |                 public=public,
292 |                 description=description
293 |             )
294 |             self.logger.info(f"Created playlist: {name} (ID: {playlist['id']})")
295 |             return utils.parse_playlist(playlist, self.username, detailed=True)
296 |         except Exception as e:
297 |             self.logger.error(f"Error creating playlist: {str(e)}")
298 |             raise
299 | 
300 |     @utils.ensure_username
301 |     def change_playlist_details(self, playlist_id: str, name: Optional[str] = None, description: Optional[str] = None):
302 |         """
303 |         Change playlist details.
304 |         - playlist_id: ID of the playlist to modify.
305 |         - name: New name for the playlist.
306 |         - public: Whether the playlist should be public.
307 |         - description: New description for the playlist.
308 |         """
309 |         if not playlist_id:
310 |             raise ValueError("No playlist ID provided.")
311 |         
312 |         try:
313 |             response = self.sp.playlist_change_details(playlist_id, name=name, description=description)
314 |             self.logger.info(f"Response from changing playlist details: {response}")
315 |         except Exception as e:
316 |             self.logger.error(f"Error changing playlist details: {str(e)}")
317 |        
318 |     def get_devices(self) -> dict:
319 |         return self.sp.devices()['devices']
320 | 
321 |     def is_active_device(self):
322 |         return any([device.get('is_active') for device in self.get_devices()])
323 | 
324 |     def _get_candidate_device(self):
325 |         devices = self.get_devices()
326 |         if not devices:
327 |             raise ConnectionError("No active device. Is Spotify open?")
328 |         for device in devices:
329 |             if device.get('is_active'):
330 |                 return device
331 |         self.logger.info(f"No active device, assigning {devices[0]['name']}.")
332 |         return devices[0]
333 | 
334 |     def auth_ok(self) -> bool:
335 |         try:
336 |             token = self.cache_handler.get_cached_token()
337 |             if token is None:
338 |                 self.logger.info("Auth check result: no token exists")
339 |                 return False
340 |                 
341 |             is_expired = self.auth_manager.is_token_expired(token)
342 |             self.logger.info(f"Auth check result: {'valid' if not is_expired else 'expired'}")
343 |             return not is_expired  # Return True if token is NOT expired
344 |         except Exception as e:
345 |             self.logger.error(f"Error checking auth status: {str(e)}")
346 |             return False  # Return False on error rather than raising
347 | 
348 |     def auth_refresh(self):
349 |         self.auth_manager.validate_token(self.cache_handler.get_cached_token())
350 | 
351 |     def skip_track(self, n=1):
352 |         # todo: Better error handling
353 |         for _ in range(n):
354 |             self.sp.next_track()
355 | 
356 |     def previous_track(self):
357 |         self.sp.previous_track()
358 | 
359 |     def seek_to_position(self, position_ms):
360 |         self.sp.seek_track(position_ms=position_ms)
361 | 
362 |     def set_volume(self, volume_percent):
363 |         self.sp.volume(volume_percent)
364 | 
```

--------------------------------------------------------------------------------
/src/spotify_mcp/server.py:
--------------------------------------------------------------------------------

```python
  1 | import asyncio
  2 | import base64
  3 | import os
  4 | import logging
  5 | import sys
  6 | from enum import Enum
  7 | import json
  8 | from typing import List, Optional, Tuple
  9 | from datetime import datetime
 10 | from pathlib import Path
 11 | 
 12 | import mcp.types as types
 13 | from mcp.server import NotificationOptions, Server  # , stdio_server
 14 | import mcp.server.stdio
 15 | from pydantic import BaseModel, Field, AnyUrl
 16 | from spotipy import SpotifyException
 17 | 
 18 | from . import spotify_api
 19 | from .utils import normalize_redirect_uri
 20 | 
 21 | 
 22 | def setup_logger():
 23 |     class Logger:
 24 |         def info(self, message):
 25 |             print(f"[INFO] {message}", file=sys.stderr)
 26 | 
 27 |         def error(self, message):
 28 |             print(f"[ERROR] {message}", file=sys.stderr)
 29 | 
 30 |     return Logger()
 31 | 
 32 | 
 33 | logger = setup_logger()
 34 | # Normalize the redirect URI to meet Spotify's requirements
 35 | if spotify_api.REDIRECT_URI:
 36 |     spotify_api.REDIRECT_URI = normalize_redirect_uri(spotify_api.REDIRECT_URI)
 37 | spotify_client = spotify_api.Client(logger)
 38 | 
 39 | server = Server("spotify-mcp")
 40 | 
 41 | 
 42 | # options =
 43 | class ToolModel(BaseModel):
 44 |     @classmethod
 45 |     def as_tool(cls):
 46 |         return types.Tool(
 47 |             name="Spotify" + cls.__name__,
 48 |             description=cls.__doc__,
 49 |             inputSchema=cls.model_json_schema()
 50 |         )
 51 | 
 52 | 
 53 | class Playback(ToolModel):
 54 |     """Manages the current playback with the following actions:
 55 |     - get: Get information about user's current track.
 56 |     - start: Starts playing new item or resumes current playback if called with no uri.
 57 |     - pause: Pauses current playback.
 58 |     - skip: Skips current track.
 59 |     """
 60 |     action: str = Field(description="Action to perform: 'get', 'start', 'pause' or 'skip'.")
 61 |     spotify_uri: Optional[str] = Field(default=None, description="Spotify uri of item to play for 'start' action. " +
 62 |                                                                  "If omitted, resumes current playback.")
 63 |     num_skips: Optional[int] = Field(default=1, description="Number of tracks to skip for `skip` action.")
 64 | 
 65 | 
 66 | class Queue(ToolModel):
 67 |     """Manage the playback queue - get the queue or add tracks."""
 68 |     action: str = Field(description="Action to perform: 'add' or 'get'.")
 69 |     track_id: Optional[str] = Field(default=None, description="Track ID to add to queue (required for add action)")
 70 | 
 71 | 
 72 | class GetInfo(ToolModel):
 73 |     """Get detailed information about a Spotify item (track, album, artist, or playlist)."""
 74 |     item_uri: str = Field(description="URI of the item to get information about. " +
 75 |                                       "If 'playlist' or 'album', returns its tracks. " +
 76 |                                       "If 'artist', returns albums and top tracks.")
 77 | 
 78 | 
 79 | class Search(ToolModel):
 80 |     """Search for tracks, albums, artists, or playlists on Spotify."""
 81 |     query: str = Field(description="query term")
 82 |     qtype: Optional[str] = Field(default="track",
 83 |                                  description="Type of items to search for (track, album, artist, playlist, " +
 84 |                                              "or comma-separated combination)")
 85 |     limit: Optional[int] = Field(default=10, description="Maximum number of items to return")
 86 | 
 87 | 
 88 | class Playlist(ToolModel):
 89 |     """Manage Spotify playlists.
 90 |     - get: Get a list of user's playlists.
 91 |     - get_tracks: Get tracks in a specific playlist.
 92 |     - add_tracks: Add tracks to a specific playlist.
 93 |     - remove_tracks: Remove tracks from a specific playlist.
 94 |     - change_details: Change details of a specific playlist.
 95 |     - create: Create a new playlist.
 96 |     """
 97 |     action: str = Field(
 98 |         description="Action to perform: 'get', 'get_tracks', 'add_tracks', 'remove_tracks', 'change_details', 'create'.")
 99 |     playlist_id: Optional[str] = Field(default=None, description="ID of the playlist to manage.")
100 |     track_ids: Optional[List[str]] = Field(default=None, description="List of track IDs to add/remove.")
101 |     name: Optional[str] = Field(default=None, description="Name for the playlist (required for create and change_details).")
102 |     description: Optional[str] = Field(default=None, description="Description for the playlist.")
103 |     public: Optional[bool] = Field(default=True, description="Whether the playlist should be public (for create action).")
104 | 
105 | 
106 | @server.list_prompts()
107 | async def handle_list_prompts() -> list[types.Prompt]:
108 |     return []
109 | 
110 | 
111 | @server.list_resources()
112 | async def handle_list_resources() -> list[types.Resource]:
113 |     return []
114 | 
115 | 
116 | @server.list_tools()
117 | async def handle_list_tools() -> list[types.Tool]:
118 |     """List available tools."""
119 |     logger.info("Listing available tools")
120 |     # await server.request_context.session.send_notification("are you recieving this notification?")
121 |     tools = [
122 |         Playback.as_tool(),
123 |         Search.as_tool(),
124 |         Queue.as_tool(),
125 |         GetInfo.as_tool(),
126 |         Playlist.as_tool(),
127 |     ]
128 |     logger.info(f"Available tools: {[tool.name for tool in tools]}")
129 |     return tools
130 | 
131 | 
132 | @server.call_tool()
133 | async def handle_call_tool(
134 |         name: str, arguments: dict | None
135 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
136 |     """Handle tool execution requests."""
137 |     logger.info(f"Tool called: {name} with arguments: {arguments}")
138 |     assert name[:7] == "Spotify", f"Unknown tool: {name}"
139 |     try:
140 |         match name[7:]:
141 |             case "Playback":
142 |                 action = arguments.get("action")
143 |                 match action:
144 |                     case "get":
145 |                         logger.info("Attempting to get current track")
146 |                         curr_track = spotify_client.get_current_track()
147 |                         if curr_track:
148 |                             logger.info(f"Current track retrieved: {curr_track.get('name', 'Unknown')}")
149 |                             return [types.TextContent(
150 |                                 type="text",
151 |                                 text=json.dumps(curr_track, indent=2)
152 |                             )]
153 |                         logger.info("No track currently playing")
154 |                         return [types.TextContent(
155 |                             type="text",
156 |                             text="No track playing."
157 |                         )]
158 |                     case "start":
159 |                         logger.info(f"Starting playback with arguments: {arguments}")
160 |                         spotify_client.start_playback(spotify_uri=arguments.get("spotify_uri"))
161 |                         logger.info("Playback started successfully")
162 |                         return [types.TextContent(
163 |                             type="text",
164 |                             text="Playback starting."
165 |                         )]
166 |                     case "pause":
167 |                         logger.info("Attempting to pause playback")
168 |                         spotify_client.pause_playback()
169 |                         logger.info("Playback paused successfully")
170 |                         return [types.TextContent(
171 |                             type="text",
172 |                             text="Playback paused."
173 |                         )]
174 |                     case "skip":
175 |                         num_skips = int(arguments.get("num_skips", 1))
176 |                         logger.info(f"Skipping {num_skips} tracks.")
177 |                         spotify_client.skip_track(n=num_skips)
178 |                         return [types.TextContent(
179 |                             type="text",
180 |                             text="Skipped to next track."
181 |                         )]
182 | 
183 |             case "Search":
184 |                 logger.info(f"Performing search with arguments: {arguments}")
185 |                 search_results = spotify_client.search(
186 |                     query=arguments.get("query", ""),
187 |                     qtype=arguments.get("qtype", "track"),
188 |                     limit=arguments.get("limit", 10)
189 |                 )
190 |                 logger.info("Search completed successfully.")
191 |                 return [types.TextContent(
192 |                     type="text",
193 |                     text=json.dumps(search_results, indent=2)
194 |                 )]
195 | 
196 |             case "Queue":
197 |                 logger.info(f"Queue operation with arguments: {arguments}")
198 |                 action = arguments.get("action")
199 | 
200 |                 match action:
201 |                     case "add":
202 |                         track_id = arguments.get("track_id")
203 |                         if not track_id:
204 |                             logger.error("track_id is required for add to queue.")
205 |                             return [types.TextContent(
206 |                                 type="text",
207 |                                 text="track_id is required for add action"
208 |                             )]
209 |                         spotify_client.add_to_queue(track_id)
210 |                         return [types.TextContent(
211 |                             type="text",
212 |                             text=f"Track added to queue."
213 |                         )]
214 | 
215 |                     case "get":
216 |                         queue = spotify_client.get_queue()
217 |                         return [types.TextContent(
218 |                             type="text",
219 |                             text=json.dumps(queue, indent=2)
220 |                         )]
221 | 
222 |                     case _:
223 |                         return [types.TextContent(
224 |                             type="text",
225 |                             text=f"Unknown queue action: {action}. Supported actions are: add, remove, and get."
226 |                         )]
227 | 
228 |             case "GetInfo":
229 |                 logger.info(f"Getting item info with arguments: {arguments}")
230 |                 item_info = spotify_client.get_info(
231 |                     item_uri=arguments.get("item_uri")
232 |                 )
233 |                 return [types.TextContent(
234 |                     type="text",
235 |                     text=json.dumps(item_info, indent=2)
236 |                 )]
237 | 
238 |             case "Playlist":
239 |                 logger.info(f"Playlist operation with arguments: {arguments}")
240 |                 action = arguments.get("action")
241 |                 match action:
242 |                     case "get":
243 |                         logger.info(f"Getting current user's playlists with arguments: {arguments}")
244 |                         playlists = spotify_client.get_current_user_playlists()
245 |                         return [types.TextContent(
246 |                             type="text",
247 |                             text=json.dumps(playlists, indent=2)
248 |                         )]
249 |                     case "get_tracks":
250 |                         logger.info(f"Getting tracks in playlist with arguments: {arguments}")
251 |                         if not arguments.get("playlist_id"):
252 |                             logger.error("playlist_id is required for get_tracks action.")
253 |                             return [types.TextContent(
254 |                                 type="text",
255 |                                 text="playlist_id is required for get_tracks action."
256 |                             )]
257 |                         tracks = spotify_client.get_playlist_tracks(arguments.get("playlist_id"))
258 |                         return [types.TextContent(
259 |                             type="text",
260 |                             text=json.dumps(tracks, indent=2)
261 |                         )]
262 |                     case "add_tracks":
263 |                         logger.info(f"Adding tracks to playlist with arguments: {arguments}")
264 |                         track_ids = arguments.get("track_ids")
265 |                         if isinstance(track_ids, str):
266 |                             try:
267 |                                 track_ids = json.loads(track_ids)  # Convert JSON string to Python list
268 |                             except json.JSONDecodeError:
269 |                                 logger.error("track_ids must be a list or a valid JSON array.")
270 |                                 return [types.TextContent(
271 |                                     type="text",
272 |                                     text="Error: track_ids must be a list or a valid JSON array."
273 |                                 )]
274 | 
275 |                         spotify_client.add_tracks_to_playlist(
276 |                             playlist_id=arguments.get("playlist_id"),
277 |                             track_ids=track_ids
278 |                         )
279 |                         return [types.TextContent(
280 |                             type="text",
281 |                             text="Tracks added to playlist."
282 |                         )]
283 |                     case "remove_tracks":
284 |                         logger.info(f"Removing tracks from playlist with arguments: {arguments}")
285 |                         track_ids = arguments.get("track_ids")
286 |                         if isinstance(track_ids, str):
287 |                             try:
288 |                                 track_ids = json.loads(track_ids)  # Convert JSON string to Python list
289 |                             except json.JSONDecodeError:
290 |                                 logger.error("track_ids must be a list or a valid JSON array.")
291 |                                 return [types.TextContent(
292 |                                     type="text",
293 |                                     text="Error: track_ids must be a list or a valid JSON array."
294 |                                 )]
295 | 
296 |                         spotify_client.remove_tracks_from_playlist(
297 |                             playlist_id=arguments.get("playlist_id"),
298 |                             track_ids=track_ids
299 |                         )
300 |                         return [types.TextContent(
301 |                             type="text",
302 |                             text="Tracks removed from playlist."
303 |                         )]
304 | 
305 |                     case "change_details":
306 |                         logger.info(f"Changing playlist details with arguments: {arguments}")
307 |                         if not arguments.get("playlist_id"):
308 |                             logger.error("playlist_id is required for change_details action.")
309 |                             return [types.TextContent(
310 |                                 type="text",
311 |                                 text="playlist_id is required for change_details action."
312 |                             )]
313 |                         if not arguments.get("name") and not arguments.get("description"):
314 |                             logger.error("At least one of name, description or public is required.")
315 |                             return [types.TextContent(
316 |                                 type="text",
317 |                                 text="At least one of name, description, public, or collaborative is required."
318 |                             )]
319 | 
320 |                         spotify_client.change_playlist_details(
321 |                             playlist_id=arguments.get("playlist_id"),
322 |                             name=arguments.get("name"),
323 |                             description=arguments.get("description")
324 |                         )
325 |                         return [types.TextContent(
326 |                             type="text",
327 |                             text="Playlist details changed."
328 |                         )]
329 | 
330 |                     case "create":
331 |                         logger.info(f"Creating playlist with arguments: {arguments}")
332 |                         if not arguments.get("name"):
333 |                             logger.error("name is required for create action.")
334 |                             return [types.TextContent(
335 |                                 type="text",
336 |                                 text="name is required for create action."
337 |                             )]
338 |                         
339 |                         playlist = spotify_client.create_playlist(
340 |                             name=arguments.get("name"),
341 |                             description=arguments.get("description"),
342 |                             public=arguments.get("public", True)
343 |                         )
344 |                         return [types.TextContent(
345 |                             type="text",
346 |                             text=json.dumps(playlist, indent=2)
347 |                         )]
348 | 
349 |                     case _:
350 |                         return [types.TextContent(
351 |                             type="text",
352 |                             text=f"Unknown playlist action: {action}."
353 |                                  "Supported actions are: get, get_tracks, add_tracks, remove_tracks, change_details, create."
354 |                         )]
355 |             case _:
356 |                 error_msg = f"Unknown tool: {name}"
357 |                 logger.error(error_msg)
358 |                 return [types.TextContent(
359 |                     type="text",
360 |                     text=error_msg
361 |                 )]
362 |     except SpotifyException as se:
363 |         error_msg = f"Spotify Client error occurred: {str(se)}"
364 |         logger.error(error_msg)
365 |         return [types.TextContent(
366 |             type="text",
367 |             text=f"An error occurred with the Spotify Client: {str(se)}"
368 |         )]
369 |     except Exception as e:
370 |         error_msg = f"Unexpected error occurred: {str(e)}"
371 |         logger.error(error_msg)
372 |         return [types.TextContent(
373 |             type="text",
374 |             text=error_msg
375 |         )]
376 | 
377 | 
378 | async def main():
379 |     try:
380 |         async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
381 |             await server.run(
382 |                 read_stream,
383 |                 write_stream,
384 |                 server.create_initialization_options()
385 |             )
386 |     except Exception as e:
387 |         logger.error(f"Server error occurred: {str(e)}")
388 |         raise
389 | 
```