# Directory Structure
```
├── .gitignore
├── .python-version
├── LICENSE
├── media
│ └── take5.mp4
├── pyproject.toml
├── README.md
├── src
│ └── spotify_mcp
│ ├── __init__.py
│ ├── server.py
│ ├── spotify_api.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.12
2 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | .cache
2 | .DS_Store
3 | __pycache__/
4 | .env
5 | .idea/
6 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # spotify-mcp MCP server
2 |
3 | MCP project to connect Claude with Spotify. Built on top of [spotipy-dev's API](https://github.com/spotipy-dev/spotipy/tree/2.24.0).
4 |
5 | ## Features
6 |
7 | - Start, pause, and skip playback
8 | - Search for tracks/albums/artists/playlists
9 | - Get info about a track/album/artist/playlist
10 | - Manage the Spotify queue
11 | - Manage, create, and update playlists
12 |
13 | ## Demo
14 |
15 | <details>
16 | <summary>
17 | Video -- turn on audio
18 | </summary>
19 | https://github.com/user-attachments/assets/20ee1f92-f3e3-4dfa-b945-ca57bc1e0894
20 | </details>
21 |
22 | ## Configuration
23 |
24 | ### Getting Spotify API Keys
25 |
26 | Create an account on [developer.spotify.com](https://developer.spotify.com/). Navigate to [the dashboard](https://developer.spotify.com/dashboard).
27 | Create an app with redirect_uri as http://127.0.0.1:8080/callback.
28 | You can choose any port you want but you must use http and an explicit loopback address (IPv4 or IPv6).
29 |
30 | See [here](https://developer.spotify.com/documentation/web-api/concepts/redirect_uri) for more info/troubleshooting.
31 | You may have to restart your MCP environment (e.g. Claude Desktop) once or twice before it works.
32 |
33 | ### Locating MCP Config
34 |
35 | For Cursor, Claude Desktop, or any other MCP-enabled client you will have to locate your config.
36 |
37 | - Claude Desktop location on MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
38 |
39 | - Claude Desktop location on Windows: `%APPDATA%/Claude/claude_desktop_config.json`
40 |
41 |
42 | ### Run this project with uvx
43 |
44 | Add this snippet to your MCP Config.
45 |
46 | ```json
47 | {
48 | "mcpServers": {
49 | "spotify": {
50 | "command": "uvx",
51 | "args": [
52 | "--python", "3.12",
53 | "--from", "git+https://github.com/varunneal/spotify-mcp",
54 | "spotify-mcp"
55 | ],
56 | "env": {
57 | "SPOTIFY_CLIENT_ID": YOUR_CLIENT_ID,
58 | "SPOTIFY_CLIENT_SECRET": YOUR_CLIENT_SECRET,
59 | "SPOTIFY_REDIRECT_URI": "http://127.0.0.1:8080/callback"
60 | }
61 | }
62 | }
63 | }
64 | ```
65 |
66 | ### Run this project locally
67 |
68 | Using UVX will open the spotify redirect URI for every tool call. To avoid this, you can run this project locally by cloning this repo:
69 |
70 | ```bash
71 | git clone https://github.com/varunneal/spotify-mcp.git
72 | ```
73 |
74 | Add it to your MCP Config like this:
75 |
76 | ```json
77 | "spotify": {
78 | "command": "uv",
79 | "args": [
80 | "--directory",
81 | "/path/to/spotify-mcp",
82 | "run",
83 | "spotify-mcp"
84 | ],
85 | "env": {
86 | "SPOTIFY_CLIENT_ID": YOUR_CLIENT_ID,
87 | "SPOTIFY_CLIENT_SECRET": YOUR_CLIENT_SECRET,
88 | "SPOTIFY_REDIRECT_URI": "http://127.0.0.1:8080/callback"
89 | }
90 | }
91 | ```
92 |
93 | ### Troubleshooting
94 |
95 | Please open an issue if you can't get this MCP working. Here are some tips:
96 |
97 | 1. Make sure `uv` is updated. I recommend version `>=0.54`.
98 | 2. If cloning locally, enable execution permisisons for the project: `chmod -R 755`.
99 | 3. Ensure you have Spotify premium (needed for running developer API).
100 |
101 | This MCP will emit logs to std err (as specified in the MCP) spec. On Mac the Claude Desktop app should emit these logs
102 | to `~/Library/Logs/Claude`.
103 | On other platforms [you can find logs here](https://modelcontextprotocol.io/quickstart/user#getting-logs-from-claude-for-desktop).
104 |
105 |
106 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
107 |
108 | ```bash
109 | npx @modelcontextprotocol/inspector uv --directory /path/to/spotify-mcp run spotify-mcp
110 | ```
111 |
112 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
113 |
114 | ## TODO
115 |
116 | Unfortunately, a bunch of cool features have [now been deprecated](https://techcrunch.com/2024/11/27/spotify-cuts-developer-access-to-several-of-its-recommendation-features/)
117 | from the Spotify API. Most new features will be relatively minor or for the health of the project:
118 |
119 | - tests.
120 | - ~~adding API support for managing playlists.~~
121 | - adding API support for paginated search results/playlists/albums.
122 |
123 | PRs appreciated! Thanks to @jamiew, @davidpadbury, @manncodes, @hyuma7, @aanurraj, @JJGO and others for contributions.
124 |
125 | [//]: # (## Deployment)
126 |
127 | [//]: # ((todo))
128 |
129 | [//]: # (### Building and Publishing)
130 |
131 | [//]: # ()
132 | [//]: # (To prepare the package for distribution:)
133 |
134 | [//]: # ()
135 | [//]: # (1. Sync dependencies and update lockfile:)
136 |
137 | [//]: # ()
138 | [//]: # (```bash)
139 |
140 | [//]: # (uv sync)
141 |
142 | [//]: # (```)
143 |
144 | [//]: # ()
145 | [//]: # (2. Build package distributions:)
146 |
147 | [//]: # ()
148 | [//]: # (```bash)
149 |
150 | [//]: # (uv build)
151 |
152 | [//]: # (```)
153 |
154 | [//]: # ()
155 | [//]: # (This will create source and wheel distributions in the `dist/` directory.)
156 |
157 | [//]: # ()
158 | [//]: # (3. Publish to PyPI:)
159 |
160 | [//]: # ()
161 | [//]: # (```bash)
162 |
163 | [//]: # (uv publish)
164 |
165 | [//]: # (```)
166 |
167 | [//]: # ()
168 | [//]: # (Note: You'll need to set PyPI credentials via environment variables or command flags:)
169 |
170 | [//]: # ()
171 | [//]: # (- Token: `--token` or `UV_PUBLISH_TOKEN`)
172 |
173 | [//]: # (- Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`)
174 |
```
--------------------------------------------------------------------------------
/src/spotify_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
1 | from . import server
2 | import asyncio
3 |
4 | def main():
5 | """Main entry point for the package."""
6 | asyncio.run(server.main())
7 |
8 | # Optionally expose other important items at package level
9 | __all__ = ['main', 'server']
10 |
11 |
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "spotify-mcp"
3 | version = "0.2.0"
4 | description = "MCP spotify project"
5 | readme = "README.md"
6 | requires-python = ">=3.12"
7 | dependencies = [
8 | "mcp==1.3.0",
9 | "python-dotenv>=1.0.1",
10 | "spotipy==2.24.0",
11 | ]
12 | [[project.authors]]
13 | name = "Varun Srivastava"
14 | email = "[email protected]"
15 |
16 | [build-system]
17 | requires = [ "hatchling",]
18 | build-backend = "hatchling.build"
19 |
20 | [dependency-groups]
21 | dev = [
22 | ]
23 |
24 | [tool.uv.sources]
25 | spotify-mcp = { workspace = true }
26 |
27 | [project.scripts]
28 | spotify-mcp = "spotify_mcp:main"
29 |
```
--------------------------------------------------------------------------------
/src/spotify_mcp/utils.py:
--------------------------------------------------------------------------------
```python
1 | from collections import defaultdict
2 | from typing import Optional, Dict
3 | import functools
4 | from typing import Callable, TypeVar
5 | from typing import Optional, Dict
6 | from urllib.parse import quote, urlparse, urlunparse
7 |
8 | from requests import RequestException
9 |
10 | T = TypeVar('T')
11 |
12 |
13 | def normalize_redirect_uri(url: str) -> str:
14 | if not url:
15 | return url
16 |
17 | parsed = urlparse(url)
18 |
19 | # Convert localhost to 127.0.0.1
20 | if parsed.netloc == 'localhost' or parsed.netloc.startswith('localhost:'):
21 | port = ''
22 | if ':' in parsed.netloc:
23 | port = ':' + parsed.netloc.split(':')[1]
24 | parsed = parsed._replace(netloc=f'127.0.0.1{port}')
25 |
26 | return urlunparse(parsed)
27 |
28 | def parse_track(track_item: dict, detailed=False) -> Optional[dict]:
29 | if not track_item:
30 | return None
31 | narrowed_item = {
32 | 'name': track_item['name'],
33 | 'id': track_item['id'],
34 | }
35 |
36 | if 'is_playing' in track_item:
37 | narrowed_item['is_playing'] = track_item['is_playing']
38 |
39 | if detailed:
40 | narrowed_item['album'] = parse_album(track_item.get('album'))
41 | for k in ['track_number', 'duration_ms']:
42 | narrowed_item[k] = track_item.get(k)
43 |
44 | if not track_item.get('is_playable', True):
45 | narrowed_item['is_playable'] = False
46 |
47 | artists = [a['name'] for a in track_item['artists']]
48 | if detailed:
49 | artists = [parse_artist(a) for a in track_item['artists']]
50 |
51 | if len(artists) == 1:
52 | narrowed_item['artist'] = artists[0]
53 | else:
54 | narrowed_item['artists'] = artists
55 |
56 | return narrowed_item
57 |
58 |
59 | def parse_artist(artist_item: dict, detailed=False) -> Optional[dict]:
60 | if not artist_item:
61 | return None
62 | narrowed_item = {
63 | 'name': artist_item['name'],
64 | 'id': artist_item['id'],
65 | }
66 | if detailed:
67 | narrowed_item['genres'] = artist_item.get('genres')
68 |
69 | return narrowed_item
70 |
71 |
72 | def parse_playlist(playlist_item: dict, username, detailed=False) -> Optional[dict]:
73 | if not playlist_item:
74 | return None
75 | narrowed_item = {
76 | 'name': playlist_item['name'],
77 | 'id': playlist_item['id'],
78 | 'owner': playlist_item['owner']['display_name'],
79 | 'user_is_owner': playlist_item['owner']['display_name'] == username,
80 | 'total_tracks': playlist_item['tracks']['total'],
81 | }
82 | if detailed:
83 | narrowed_item['description'] = playlist_item.get('description')
84 | tracks = []
85 | for t in playlist_item['tracks']['items']:
86 | tracks.append(parse_track(t['track']))
87 | narrowed_item['tracks'] = tracks
88 |
89 | return narrowed_item
90 |
91 |
92 | def parse_album(album_item: dict, detailed=False) -> dict:
93 | narrowed_item = {
94 | 'name': album_item['name'],
95 | 'id': album_item['id'],
96 | }
97 |
98 | artists = [a['name'] for a in album_item['artists']]
99 |
100 | if detailed:
101 | tracks = []
102 | for t in album_item['tracks']['items']:
103 | tracks.append(parse_track(t))
104 | narrowed_item["tracks"] = tracks
105 | artists = [parse_artist(a) for a in album_item['artists']]
106 |
107 | for k in ['total_tracks', 'release_date', 'genres']:
108 | narrowed_item[k] = album_item.get(k)
109 |
110 | if len(artists) == 1:
111 | narrowed_item['artist'] = artists[0]
112 | else:
113 | narrowed_item['artists'] = artists
114 |
115 | return narrowed_item
116 |
117 |
118 | def parse_search_results(results: Dict, qtype: str, username: Optional[str] = None):
119 | _results = defaultdict(list)
120 | # potential
121 | # if username:
122 | # _results['User Spotify URI'] = username
123 |
124 | for q in qtype.split(","):
125 | match q:
126 | case "track":
127 | for idx, item in enumerate(results['tracks']['items']):
128 | if not item: continue
129 | _results['tracks'].append(parse_track(item))
130 | case "artist":
131 | for idx, item in enumerate(results['artists']['items']):
132 | if not item: continue
133 | _results['artists'].append(parse_artist(item))
134 | case "playlist":
135 | for idx, item in enumerate(results['playlists']['items']):
136 | if not item: continue
137 | _results['playlists'].append(parse_playlist(item, username))
138 | case "album":
139 | for idx, item in enumerate(results['albums']['items']):
140 | if not item: continue
141 | _results['albums'].append(parse_album(item))
142 | case _:
143 | raise ValueError(f"Unknown qtype {qtype}")
144 |
145 | return dict(_results)
146 |
147 | def parse_tracks(items: Dict) -> list:
148 | """
149 | Parse a list of track items and return a list of parsed tracks.
150 |
151 | Args:
152 | items: List of track items
153 | Returns:
154 | List of parsed tracks
155 | """
156 | tracks = []
157 | for idx, item in enumerate(items):
158 | if not item:
159 | continue
160 | tracks.append(parse_track(item['track']))
161 | return tracks
162 |
163 |
164 | def build_search_query(base_query: str,
165 | artist: Optional[str] = None,
166 | track: Optional[str] = None,
167 | album: Optional[str] = None,
168 | year: Optional[str] = None,
169 | year_range: Optional[tuple[int, int]] = None,
170 | # upc: Optional[str] = None,
171 | # isrc: Optional[str] = None,
172 | genre: Optional[str] = None,
173 | is_hipster: bool = False,
174 | is_new: bool = False
175 | ) -> str:
176 | """
177 | Build a search query string with optional filters.
178 |
179 | Args:
180 | base_query: Base search term
181 | artist: Artist name filter
182 | track: Track name filter
183 | album: Album name filter
184 | year: Specific year filter
185 | year_range: Tuple of (start_year, end_year) for year range filter
186 | genre: Genre filter
187 | is_hipster: Filter for lowest 10% popularity albums
188 | is_new: Filter for albums released in past two weeks
189 |
190 | Returns:
191 | Encoded query string with applied filters
192 | """
193 | filters = []
194 |
195 | if artist:
196 | filters.append(f"artist:{artist}")
197 | if track:
198 | filters.append(f"track:{track}")
199 | if album:
200 | filters.append(f"album:{album}")
201 | if year:
202 | filters.append(f"year:{year}")
203 | if year_range:
204 | filters.append(f"year:{year_range[0]}-{year_range[1]}")
205 | if genre:
206 | filters.append(f"genre:{genre}")
207 | if is_hipster:
208 | filters.append("tag:hipster")
209 | if is_new:
210 | filters.append("tag:new")
211 |
212 | query_parts = [base_query] + filters
213 | return quote(" ".join(query_parts))
214 |
215 |
216 | def validate(func: Callable[..., T]) -> Callable[..., T]:
217 | """
218 | Decorator for Spotify API methods that handles authentication and device validation.
219 | - Checks and refreshes authentication if needed
220 | - Validates active device and retries with candidate device if needed
221 | """
222 |
223 | @functools.wraps(func)
224 | def wrapper(self, *args, **kwargs):
225 | # Handle authentication
226 | if not self.auth_ok():
227 | self.auth_refresh()
228 |
229 | # Handle device validation
230 | if not self.is_active_device():
231 | kwargs['device'] = self._get_candidate_device()
232 |
233 | # TODO: try-except RequestException
234 | return func(self, *args, **kwargs)
235 |
236 | return wrapper
237 |
238 | def ensure_username(func):
239 | """
240 | Decorator to ensure that the username is set before calling the function.
241 | """
242 | @functools.wraps(func)
243 | def wrapper(self, *args, **kwargs):
244 | if self.username is None:
245 | self.set_username()
246 | return func(self, *args, **kwargs)
247 | return wrapper
248 |
```
--------------------------------------------------------------------------------
/src/spotify_mcp/spotify_api.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | import os
3 | from typing import Optional, Dict, List
4 |
5 | import spotipy
6 | from dotenv import load_dotenv
7 | from spotipy.cache_handler import CacheFileHandler
8 | from spotipy.oauth2 import SpotifyOAuth
9 |
10 | from . import utils
11 |
12 | load_dotenv()
13 |
14 | CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
15 | CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
16 | REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI")
17 |
18 | # Normalize the redirect URI to meet Spotify's requirements
19 | if REDIRECT_URI:
20 | REDIRECT_URI = utils.normalize_redirect_uri(REDIRECT_URI)
21 |
22 | SCOPES = ["user-read-currently-playing", "user-read-playback-state", "user-read-currently-playing", # spotify connect
23 | "app-remote-control", "streaming", # playback
24 | "playlist-read-private", "playlist-read-collaborative", "playlist-modify-private", "playlist-modify-public",
25 | # playlists
26 | "user-read-playback-position", "user-top-read", "user-read-recently-played", # listening history
27 | "user-library-modify", "user-library-read", # library
28 | ]
29 |
30 |
31 | class Client:
32 | def __init__(self, logger: logging.Logger):
33 | """Initialize Spotify client with necessary permissions"""
34 | self.logger = logger
35 |
36 | scope = "user-library-read,user-read-playback-state,user-modify-playback-state,user-read-currently-playing,playlist-read-private,playlist-read-collaborative,playlist-modify-private,playlist-modify-public"
37 |
38 | try:
39 | self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
40 | scope=scope,
41 | client_id=CLIENT_ID,
42 | client_secret=CLIENT_SECRET,
43 | redirect_uri=REDIRECT_URI))
44 |
45 | self.auth_manager: SpotifyOAuth = self.sp.auth_manager
46 | self.cache_handler: CacheFileHandler = self.auth_manager.cache_handler
47 | except Exception as e:
48 | self.logger.error(f"Failed to initialize Spotify client: {str(e)}")
49 | raise
50 |
51 | self.username = None
52 |
53 | @utils.validate
54 | def set_username(self, device=None):
55 | self.username = self.sp.current_user()['display_name']
56 |
57 | @utils.validate
58 | def search(self, query: str, qtype: str = 'track', limit=10, device=None):
59 | """
60 | Searches based of query term.
61 | - query: query term
62 | - qtype: the types of items to return. One or more of 'artist', 'album', 'track', 'playlist'.
63 | If multiple types are desired, pass in a comma separated string; e.g. 'track,album'
64 | - limit: max # items to return
65 | """
66 | if self.username is None:
67 | self.set_username()
68 | results = self.sp.search(q=query, limit=limit, type=qtype)
69 | if not results:
70 | raise ValueError("No search results found.")
71 | return utils.parse_search_results(results, qtype, self.username)
72 |
73 | def recommendations(self, artists: Optional[List] = None, tracks: Optional[List] = None, limit=20):
74 | # doesnt work
75 | recs = self.sp.recommendations(seed_artists=artists, seed_tracks=tracks, limit=limit)
76 | return recs
77 |
78 | def get_info(self, item_uri: str) -> dict:
79 | """
80 | Returns more info about item.
81 | - item_uri: uri. Looks like 'spotify:track:xxxxxx', 'spotify:album:xxxxxx', etc.
82 | """
83 | _, qtype, item_id = item_uri.split(":")
84 | match qtype:
85 | case 'track':
86 | return utils.parse_track(self.sp.track(item_id), detailed=True)
87 | case 'album':
88 | album_info = utils.parse_album(self.sp.album(item_id), detailed=True)
89 | return album_info
90 | case 'artist':
91 | artist_info = utils.parse_artist(self.sp.artist(item_id), detailed=True)
92 | albums = self.sp.artist_albums(item_id)
93 | top_tracks = self.sp.artist_top_tracks(item_id)['tracks']
94 | albums_and_tracks = {
95 | 'albums': albums,
96 | 'tracks': {'items': top_tracks}
97 | }
98 | parsed_info = utils.parse_search_results(albums_and_tracks, qtype="album,track")
99 | artist_info['top_tracks'] = parsed_info['tracks']
100 | artist_info['albums'] = parsed_info['albums']
101 |
102 | return artist_info
103 | case 'playlist':
104 | if self.username is None:
105 | self.set_username()
106 | playlist = self.sp.playlist(item_id)
107 | self.logger.info(f"playlist info is {playlist}")
108 | playlist_info = utils.parse_playlist(playlist, self.username, detailed=True)
109 |
110 | return playlist_info
111 |
112 | raise ValueError(f"Unknown qtype {qtype}")
113 |
114 | def get_current_track(self) -> Optional[Dict]:
115 | """Get information about the currently playing track"""
116 | try:
117 | # current_playback vs current_user_playing_track?
118 | current = self.sp.current_user_playing_track()
119 | if not current:
120 | self.logger.info("No playback session found")
121 | return None
122 | if current.get('currently_playing_type') != 'track':
123 | self.logger.info("Current playback is not a track")
124 | return None
125 |
126 | track_info = utils.parse_track(current['item'])
127 | if 'is_playing' in current:
128 | track_info['is_playing'] = current['is_playing']
129 |
130 | self.logger.info(
131 | f"Current track: {track_info.get('name', 'Unknown')} by {track_info.get('artist', 'Unknown')}")
132 | return track_info
133 | except Exception as e:
134 | self.logger.error("Error getting current track info.")
135 | raise
136 |
137 | @utils.validate
138 | def start_playback(self, spotify_uri=None, device=None):
139 | """
140 | Starts spotify playback of uri. If spotify_uri is omitted, resumes current playback.
141 | - spotify_uri: ID of resource to play, or None. Typically looks like 'spotify:track:xxxxxx' or 'spotify:album:xxxxxx'.
142 | """
143 | try:
144 | self.logger.info(f"Starting playback for spotify_uri: {spotify_uri} on {device}")
145 | if not spotify_uri:
146 | if self.is_track_playing():
147 | self.logger.info("No track_id provided and playback already active.")
148 | return
149 | if not self.get_current_track():
150 | raise ValueError("No track_id provided and no current playback to resume.")
151 |
152 | if spotify_uri is not None:
153 | if spotify_uri.startswith('spotify:track:'):
154 | uris = [spotify_uri]
155 | context_uri = None
156 | else:
157 | uris = None
158 | context_uri = spotify_uri
159 | else:
160 | uris = None
161 | context_uri = None
162 |
163 | device_id = device.get('id') if device else None
164 |
165 | self.logger.info(f"Starting playback of on {device}: context_uri={context_uri}, uris={uris}")
166 | result = self.sp.start_playback(uris=uris, context_uri=context_uri, device_id=device_id)
167 | self.logger.info(f"Playback result: {result}")
168 | return result
169 | except Exception as e:
170 | self.logger.error(f"Error starting playback: {str(e)}.")
171 | raise
172 |
173 | @utils.validate
174 | def pause_playback(self, device=None):
175 | """Pauses playback."""
176 | playback = self.sp.current_playback()
177 | if playback and playback.get('is_playing'):
178 | self.sp.pause_playback(device.get('id') if device else None)
179 |
180 | @utils.validate
181 | def add_to_queue(self, track_id: str, device=None):
182 | """
183 | Adds track to queue.
184 | - track_id: ID of track to play.
185 | """
186 | self.sp.add_to_queue(track_id, device.get('id') if device else None)
187 |
188 | @utils.validate
189 | def get_queue(self, device=None):
190 | """Returns the current queue of tracks."""
191 | queue_info = self.sp.queue()
192 | queue_info['currently_playing'] = self.get_current_track()
193 |
194 | queue_info['queue'] = [utils.parse_track(track) for track in queue_info.pop('queue')]
195 |
196 | return queue_info
197 |
198 | def get_liked_songs(self):
199 | # todo
200 | results = self.sp.current_user_saved_tracks()
201 | for idx, item in enumerate(results['items']):
202 | track = item['track']
203 | print(idx, track['artists'][0]['name'], " – ", track['name'])
204 |
205 | def is_track_playing(self) -> bool:
206 | """Returns if a track is actively playing."""
207 | curr_track = self.get_current_track()
208 | if not curr_track:
209 | return False
210 | if curr_track.get('is_playing'):
211 | return True
212 | return False
213 |
214 | def get_current_user_playlists(self, limit=50) -> List[Dict]:
215 | """
216 | Get current user's playlists.
217 | - limit: Max number of playlists to return.
218 | """
219 | playlists = self.sp.current_user_playlists()
220 | if not playlists:
221 | raise ValueError("No playlists found.")
222 | return [utils.parse_playlist(playlist, self.username) for playlist in playlists['items']]
223 |
224 | @utils.ensure_username
225 | def get_playlist_tracks(self, playlist_id: str, limit=50) -> List[Dict]:
226 | """
227 | Get tracks from a playlist.
228 | - playlist_id: ID of the playlist to get tracks from.
229 | - limit: Max number of tracks to return.
230 | """
231 | playlist = self.sp.playlist(playlist_id)
232 | if not playlist:
233 | raise ValueError("No playlist found.")
234 | return utils.parse_tracks(playlist['tracks']['items'])
235 |
236 | @utils.ensure_username
237 | def add_tracks_to_playlist(self, playlist_id: str, track_ids: List[str], position: Optional[int] = None):
238 | """
239 | Add tracks to a playlist.
240 | - playlist_id: ID of the playlist to modify.
241 | - track_ids: List of track IDs to add.
242 | - position: Position to insert the tracks at (optional).
243 | """
244 | if not playlist_id:
245 | raise ValueError("No playlist ID provided.")
246 | if not track_ids:
247 | raise ValueError("No track IDs provided.")
248 |
249 | try:
250 | response = self.sp.playlist_add_items(playlist_id, track_ids, position=position)
251 | self.logger.info(f"Response from adding tracks: {track_ids} to playlist {playlist_id}: {response}")
252 | except Exception as e:
253 | self.logger.error(f"Error adding tracks to playlist: {str(e)}")
254 |
255 | @utils.ensure_username
256 | def remove_tracks_from_playlist(self, playlist_id: str, track_ids: List[str]):
257 | """
258 | Remove tracks from a playlist.
259 | - playlist_id: ID of the playlist to modify.
260 | - track_ids: List of track IDs to remove.
261 | """
262 | if not playlist_id:
263 | raise ValueError("No playlist ID provided.")
264 | if not track_ids:
265 | raise ValueError("No track IDs provided.")
266 |
267 | try:
268 | response = self.sp.playlist_remove_all_occurrences_of_items(playlist_id, track_ids)
269 | self.logger.info(f"Response from removing tracks: {track_ids} from playlist {playlist_id}: {response}")
270 | except Exception as e:
271 | self.logger.error(f"Error removing tracks from playlist: {str(e)}")
272 |
273 | @utils.ensure_username
274 | def create_playlist(self, name: str, description: Optional[str] = None, public: bool = True):
275 | """
276 | Create a new playlist.
277 | - name: Name for the playlist.
278 | - description: Description for the playlist.
279 | - public: Whether the playlist should be public.
280 | """
281 | if not name:
282 | raise ValueError("Playlist name is required.")
283 |
284 | try:
285 | user = self.sp.current_user()
286 | user_id = user['id']
287 |
288 | playlist = self.sp.user_playlist_create(
289 | user=user_id,
290 | name=name,
291 | public=public,
292 | description=description
293 | )
294 | self.logger.info(f"Created playlist: {name} (ID: {playlist['id']})")
295 | return utils.parse_playlist(playlist, self.username, detailed=True)
296 | except Exception as e:
297 | self.logger.error(f"Error creating playlist: {str(e)}")
298 | raise
299 |
300 | @utils.ensure_username
301 | def change_playlist_details(self, playlist_id: str, name: Optional[str] = None, description: Optional[str] = None):
302 | """
303 | Change playlist details.
304 | - playlist_id: ID of the playlist to modify.
305 | - name: New name for the playlist.
306 | - public: Whether the playlist should be public.
307 | - description: New description for the playlist.
308 | """
309 | if not playlist_id:
310 | raise ValueError("No playlist ID provided.")
311 |
312 | try:
313 | response = self.sp.playlist_change_details(playlist_id, name=name, description=description)
314 | self.logger.info(f"Response from changing playlist details: {response}")
315 | except Exception as e:
316 | self.logger.error(f"Error changing playlist details: {str(e)}")
317 |
318 | def get_devices(self) -> dict:
319 | return self.sp.devices()['devices']
320 |
321 | def is_active_device(self):
322 | return any([device.get('is_active') for device in self.get_devices()])
323 |
324 | def _get_candidate_device(self):
325 | devices = self.get_devices()
326 | if not devices:
327 | raise ConnectionError("No active device. Is Spotify open?")
328 | for device in devices:
329 | if device.get('is_active'):
330 | return device
331 | self.logger.info(f"No active device, assigning {devices[0]['name']}.")
332 | return devices[0]
333 |
334 | def auth_ok(self) -> bool:
335 | try:
336 | token = self.cache_handler.get_cached_token()
337 | if token is None:
338 | self.logger.info("Auth check result: no token exists")
339 | return False
340 |
341 | is_expired = self.auth_manager.is_token_expired(token)
342 | self.logger.info(f"Auth check result: {'valid' if not is_expired else 'expired'}")
343 | return not is_expired # Return True if token is NOT expired
344 | except Exception as e:
345 | self.logger.error(f"Error checking auth status: {str(e)}")
346 | return False # Return False on error rather than raising
347 |
348 | def auth_refresh(self):
349 | self.auth_manager.validate_token(self.cache_handler.get_cached_token())
350 |
351 | def skip_track(self, n=1):
352 | # todo: Better error handling
353 | for _ in range(n):
354 | self.sp.next_track()
355 |
356 | def previous_track(self):
357 | self.sp.previous_track()
358 |
359 | def seek_to_position(self, position_ms):
360 | self.sp.seek_track(position_ms=position_ms)
361 |
362 | def set_volume(self, volume_percent):
363 | self.sp.volume(volume_percent)
364 |
```
--------------------------------------------------------------------------------
/src/spotify_mcp/server.py:
--------------------------------------------------------------------------------
```python
1 | import asyncio
2 | import base64
3 | import os
4 | import logging
5 | import sys
6 | from enum import Enum
7 | import json
8 | from typing import List, Optional, Tuple
9 | from datetime import datetime
10 | from pathlib import Path
11 |
12 | import mcp.types as types
13 | from mcp.server import NotificationOptions, Server # , stdio_server
14 | import mcp.server.stdio
15 | from pydantic import BaseModel, Field, AnyUrl
16 | from spotipy import SpotifyException
17 |
18 | from . import spotify_api
19 | from .utils import normalize_redirect_uri
20 |
21 |
22 | def setup_logger():
23 | class Logger:
24 | def info(self, message):
25 | print(f"[INFO] {message}", file=sys.stderr)
26 |
27 | def error(self, message):
28 | print(f"[ERROR] {message}", file=sys.stderr)
29 |
30 | return Logger()
31 |
32 |
33 | logger = setup_logger()
34 | # Normalize the redirect URI to meet Spotify's requirements
35 | if spotify_api.REDIRECT_URI:
36 | spotify_api.REDIRECT_URI = normalize_redirect_uri(spotify_api.REDIRECT_URI)
37 | spotify_client = spotify_api.Client(logger)
38 |
39 | server = Server("spotify-mcp")
40 |
41 |
42 | # options =
43 | class ToolModel(BaseModel):
44 | @classmethod
45 | def as_tool(cls):
46 | return types.Tool(
47 | name="Spotify" + cls.__name__,
48 | description=cls.__doc__,
49 | inputSchema=cls.model_json_schema()
50 | )
51 |
52 |
53 | class Playback(ToolModel):
54 | """Manages the current playback with the following actions:
55 | - get: Get information about user's current track.
56 | - start: Starts playing new item or resumes current playback if called with no uri.
57 | - pause: Pauses current playback.
58 | - skip: Skips current track.
59 | """
60 | action: str = Field(description="Action to perform: 'get', 'start', 'pause' or 'skip'.")
61 | spotify_uri: Optional[str] = Field(default=None, description="Spotify uri of item to play for 'start' action. " +
62 | "If omitted, resumes current playback.")
63 | num_skips: Optional[int] = Field(default=1, description="Number of tracks to skip for `skip` action.")
64 |
65 |
66 | class Queue(ToolModel):
67 | """Manage the playback queue - get the queue or add tracks."""
68 | action: str = Field(description="Action to perform: 'add' or 'get'.")
69 | track_id: Optional[str] = Field(default=None, description="Track ID to add to queue (required for add action)")
70 |
71 |
72 | class GetInfo(ToolModel):
73 | """Get detailed information about a Spotify item (track, album, artist, or playlist)."""
74 | item_uri: str = Field(description="URI of the item to get information about. " +
75 | "If 'playlist' or 'album', returns its tracks. " +
76 | "If 'artist', returns albums and top tracks.")
77 |
78 |
79 | class Search(ToolModel):
80 | """Search for tracks, albums, artists, or playlists on Spotify."""
81 | query: str = Field(description="query term")
82 | qtype: Optional[str] = Field(default="track",
83 | description="Type of items to search for (track, album, artist, playlist, " +
84 | "or comma-separated combination)")
85 | limit: Optional[int] = Field(default=10, description="Maximum number of items to return")
86 |
87 |
88 | class Playlist(ToolModel):
89 | """Manage Spotify playlists.
90 | - get: Get a list of user's playlists.
91 | - get_tracks: Get tracks in a specific playlist.
92 | - add_tracks: Add tracks to a specific playlist.
93 | - remove_tracks: Remove tracks from a specific playlist.
94 | - change_details: Change details of a specific playlist.
95 | - create: Create a new playlist.
96 | """
97 | action: str = Field(
98 | description="Action to perform: 'get', 'get_tracks', 'add_tracks', 'remove_tracks', 'change_details', 'create'.")
99 | playlist_id: Optional[str] = Field(default=None, description="ID of the playlist to manage.")
100 | track_ids: Optional[List[str]] = Field(default=None, description="List of track IDs to add/remove.")
101 | name: Optional[str] = Field(default=None, description="Name for the playlist (required for create and change_details).")
102 | description: Optional[str] = Field(default=None, description="Description for the playlist.")
103 | public: Optional[bool] = Field(default=True, description="Whether the playlist should be public (for create action).")
104 |
105 |
106 | @server.list_prompts()
107 | async def handle_list_prompts() -> list[types.Prompt]:
108 | return []
109 |
110 |
111 | @server.list_resources()
112 | async def handle_list_resources() -> list[types.Resource]:
113 | return []
114 |
115 |
116 | @server.list_tools()
117 | async def handle_list_tools() -> list[types.Tool]:
118 | """List available tools."""
119 | logger.info("Listing available tools")
120 | # await server.request_context.session.send_notification("are you recieving this notification?")
121 | tools = [
122 | Playback.as_tool(),
123 | Search.as_tool(),
124 | Queue.as_tool(),
125 | GetInfo.as_tool(),
126 | Playlist.as_tool(),
127 | ]
128 | logger.info(f"Available tools: {[tool.name for tool in tools]}")
129 | return tools
130 |
131 |
132 | @server.call_tool()
133 | async def handle_call_tool(
134 | name: str, arguments: dict | None
135 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
136 | """Handle tool execution requests."""
137 | logger.info(f"Tool called: {name} with arguments: {arguments}")
138 | assert name[:7] == "Spotify", f"Unknown tool: {name}"
139 | try:
140 | match name[7:]:
141 | case "Playback":
142 | action = arguments.get("action")
143 | match action:
144 | case "get":
145 | logger.info("Attempting to get current track")
146 | curr_track = spotify_client.get_current_track()
147 | if curr_track:
148 | logger.info(f"Current track retrieved: {curr_track.get('name', 'Unknown')}")
149 | return [types.TextContent(
150 | type="text",
151 | text=json.dumps(curr_track, indent=2)
152 | )]
153 | logger.info("No track currently playing")
154 | return [types.TextContent(
155 | type="text",
156 | text="No track playing."
157 | )]
158 | case "start":
159 | logger.info(f"Starting playback with arguments: {arguments}")
160 | spotify_client.start_playback(spotify_uri=arguments.get("spotify_uri"))
161 | logger.info("Playback started successfully")
162 | return [types.TextContent(
163 | type="text",
164 | text="Playback starting."
165 | )]
166 | case "pause":
167 | logger.info("Attempting to pause playback")
168 | spotify_client.pause_playback()
169 | logger.info("Playback paused successfully")
170 | return [types.TextContent(
171 | type="text",
172 | text="Playback paused."
173 | )]
174 | case "skip":
175 | num_skips = int(arguments.get("num_skips", 1))
176 | logger.info(f"Skipping {num_skips} tracks.")
177 | spotify_client.skip_track(n=num_skips)
178 | return [types.TextContent(
179 | type="text",
180 | text="Skipped to next track."
181 | )]
182 |
183 | case "Search":
184 | logger.info(f"Performing search with arguments: {arguments}")
185 | search_results = spotify_client.search(
186 | query=arguments.get("query", ""),
187 | qtype=arguments.get("qtype", "track"),
188 | limit=arguments.get("limit", 10)
189 | )
190 | logger.info("Search completed successfully.")
191 | return [types.TextContent(
192 | type="text",
193 | text=json.dumps(search_results, indent=2)
194 | )]
195 |
196 | case "Queue":
197 | logger.info(f"Queue operation with arguments: {arguments}")
198 | action = arguments.get("action")
199 |
200 | match action:
201 | case "add":
202 | track_id = arguments.get("track_id")
203 | if not track_id:
204 | logger.error("track_id is required for add to queue.")
205 | return [types.TextContent(
206 | type="text",
207 | text="track_id is required for add action"
208 | )]
209 | spotify_client.add_to_queue(track_id)
210 | return [types.TextContent(
211 | type="text",
212 | text=f"Track added to queue."
213 | )]
214 |
215 | case "get":
216 | queue = spotify_client.get_queue()
217 | return [types.TextContent(
218 | type="text",
219 | text=json.dumps(queue, indent=2)
220 | )]
221 |
222 | case _:
223 | return [types.TextContent(
224 | type="text",
225 | text=f"Unknown queue action: {action}. Supported actions are: add, remove, and get."
226 | )]
227 |
228 | case "GetInfo":
229 | logger.info(f"Getting item info with arguments: {arguments}")
230 | item_info = spotify_client.get_info(
231 | item_uri=arguments.get("item_uri")
232 | )
233 | return [types.TextContent(
234 | type="text",
235 | text=json.dumps(item_info, indent=2)
236 | )]
237 |
238 | case "Playlist":
239 | logger.info(f"Playlist operation with arguments: {arguments}")
240 | action = arguments.get("action")
241 | match action:
242 | case "get":
243 | logger.info(f"Getting current user's playlists with arguments: {arguments}")
244 | playlists = spotify_client.get_current_user_playlists()
245 | return [types.TextContent(
246 | type="text",
247 | text=json.dumps(playlists, indent=2)
248 | )]
249 | case "get_tracks":
250 | logger.info(f"Getting tracks in playlist with arguments: {arguments}")
251 | if not arguments.get("playlist_id"):
252 | logger.error("playlist_id is required for get_tracks action.")
253 | return [types.TextContent(
254 | type="text",
255 | text="playlist_id is required for get_tracks action."
256 | )]
257 | tracks = spotify_client.get_playlist_tracks(arguments.get("playlist_id"))
258 | return [types.TextContent(
259 | type="text",
260 | text=json.dumps(tracks, indent=2)
261 | )]
262 | case "add_tracks":
263 | logger.info(f"Adding tracks to playlist with arguments: {arguments}")
264 | track_ids = arguments.get("track_ids")
265 | if isinstance(track_ids, str):
266 | try:
267 | track_ids = json.loads(track_ids) # Convert JSON string to Python list
268 | except json.JSONDecodeError:
269 | logger.error("track_ids must be a list or a valid JSON array.")
270 | return [types.TextContent(
271 | type="text",
272 | text="Error: track_ids must be a list or a valid JSON array."
273 | )]
274 |
275 | spotify_client.add_tracks_to_playlist(
276 | playlist_id=arguments.get("playlist_id"),
277 | track_ids=track_ids
278 | )
279 | return [types.TextContent(
280 | type="text",
281 | text="Tracks added to playlist."
282 | )]
283 | case "remove_tracks":
284 | logger.info(f"Removing tracks from playlist with arguments: {arguments}")
285 | track_ids = arguments.get("track_ids")
286 | if isinstance(track_ids, str):
287 | try:
288 | track_ids = json.loads(track_ids) # Convert JSON string to Python list
289 | except json.JSONDecodeError:
290 | logger.error("track_ids must be a list or a valid JSON array.")
291 | return [types.TextContent(
292 | type="text",
293 | text="Error: track_ids must be a list or a valid JSON array."
294 | )]
295 |
296 | spotify_client.remove_tracks_from_playlist(
297 | playlist_id=arguments.get("playlist_id"),
298 | track_ids=track_ids
299 | )
300 | return [types.TextContent(
301 | type="text",
302 | text="Tracks removed from playlist."
303 | )]
304 |
305 | case "change_details":
306 | logger.info(f"Changing playlist details with arguments: {arguments}")
307 | if not arguments.get("playlist_id"):
308 | logger.error("playlist_id is required for change_details action.")
309 | return [types.TextContent(
310 | type="text",
311 | text="playlist_id is required for change_details action."
312 | )]
313 | if not arguments.get("name") and not arguments.get("description"):
314 | logger.error("At least one of name, description or public is required.")
315 | return [types.TextContent(
316 | type="text",
317 | text="At least one of name, description, public, or collaborative is required."
318 | )]
319 |
320 | spotify_client.change_playlist_details(
321 | playlist_id=arguments.get("playlist_id"),
322 | name=arguments.get("name"),
323 | description=arguments.get("description")
324 | )
325 | return [types.TextContent(
326 | type="text",
327 | text="Playlist details changed."
328 | )]
329 |
330 | case "create":
331 | logger.info(f"Creating playlist with arguments: {arguments}")
332 | if not arguments.get("name"):
333 | logger.error("name is required for create action.")
334 | return [types.TextContent(
335 | type="text",
336 | text="name is required for create action."
337 | )]
338 |
339 | playlist = spotify_client.create_playlist(
340 | name=arguments.get("name"),
341 | description=arguments.get("description"),
342 | public=arguments.get("public", True)
343 | )
344 | return [types.TextContent(
345 | type="text",
346 | text=json.dumps(playlist, indent=2)
347 | )]
348 |
349 | case _:
350 | return [types.TextContent(
351 | type="text",
352 | text=f"Unknown playlist action: {action}."
353 | "Supported actions are: get, get_tracks, add_tracks, remove_tracks, change_details, create."
354 | )]
355 | case _:
356 | error_msg = f"Unknown tool: {name}"
357 | logger.error(error_msg)
358 | return [types.TextContent(
359 | type="text",
360 | text=error_msg
361 | )]
362 | except SpotifyException as se:
363 | error_msg = f"Spotify Client error occurred: {str(se)}"
364 | logger.error(error_msg)
365 | return [types.TextContent(
366 | type="text",
367 | text=f"An error occurred with the Spotify Client: {str(se)}"
368 | )]
369 | except Exception as e:
370 | error_msg = f"Unexpected error occurred: {str(e)}"
371 | logger.error(error_msg)
372 | return [types.TextContent(
373 | type="text",
374 | text=error_msg
375 | )]
376 |
377 |
378 | async def main():
379 | try:
380 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
381 | await server.run(
382 | read_stream,
383 | write_stream,
384 | server.create_initialization_options()
385 | )
386 | except Exception as e:
387 | logger.error(f"Server error occurred: {str(e)}")
388 | raise
389 |
```