# Directory Structure ``` ├── .gitignore ├── code │ ├── account_management.py │ ├── post_tweet.py │ └── retrieve_tweets.py ├── LICENSE ├── package.json ├── README.md └── src ├── formatter.ts ├── index.ts ├── twitter-api.ts └── types.ts ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Dependencies node_modules/ # Environment variables .env # Build output dist/ # Log files npm-debug.log* yarn-debug.log* yarn-error.log* # Editor directories and files .idea/ .vscode/ *.suo *.ntvs* *.njsproj *.sln *.sw? # OS generated files .DS_Store Thumbbs.db ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # <img src="https://www.x.com/favicon.ico" alt="X Logo" width="32" height="32"> X (Twitter) MCP Server This MCP server allows Clients to interact with X (formerly Twitter), enabling comprehensive platform operations including posting tweets, searching content, managing accounts, and organizing lists. ## Quick Start 1. Create an X Developer account and get your API keys from [X Developer Portal](https://developer.twitter.com/en/portal/dashboard) 2. Set all required API keys in the environment variables 3. Clone this repository: `git clone https://github.com/Dishant27/twitter-mcp.git` 4. Install dependencies: `npm install` 5. Run the server: - With environment variables: ```bash TWITTER_API_KEY=your_api_key \ TWITTER_API_SECRET=your_api_secret \ TWITTER_ACCESS_TOKEN=your_access_token \ TWITTER_ACCESS_TOKEN_SECRET=your_access_token_secret \ npm start ``` - Using a `.env` file: ```bash # Create a .env file with your X API keys echo "TWITTER_API_KEY=your_api_key TWITTER_API_SECRET=your_api_secret TWITTER_ACCESS_TOKEN=your_access_token TWITTER_ACCESS_TOKEN_SECRET=your_access_token_secret" > .env # Start the server npm start ``` 6. Use with a MCP client, such as Claude. ## Claude Configuration To use this server with Claude, you'll need to set up the MCP configuration. Here's an example of how the configuration structure should look: ```json { "name": "x", "display_name": "X", "description": "X MCP allows Claude to interact with X (formerly Twitter)", "path": "path/to/twitter-mcp/dist/index.js", "startup": { "env": { "TWITTER_API_KEY": "your_api_key", "TWITTER_API_SECRET": "your_api_secret", "TWITTER_ACCESS_TOKEN": "your_access_token", "TWITTER_ACCESS_TOKEN_SECRET": "your_access_token_secret" } }, "transport": "stdio" } ``` Save this configuration in your Claude MCP config directory, typically located at: - Windows: `%APPDATA%\AnthropicClaude\mcp-servers` - macOS: `~/Library/Application Support/AnthropicClaude/mcp-servers` - Linux: `~/.config/AnthropicClaude/mcp-servers` ## Features ### Post Operations - Post content (up to 280 characters) - Search for posts by query with customizable result count ### Account Management - Get profile information for any user or the authenticated account - Update profile details (name, bio, location, website URL) - Follow and unfollow users - List followers for any user or the authenticated account - List accounts that a user is following ### List Management - Create new lists (public or private) - Get information about specific lists - Retrieve all lists owned by the authenticated user ## Available MCP Tools | Tool Name | Description | |-----------|-------------| | `post_tweet` | Post new content to X | | `search_tweets` | Search for content on X | | `get_profile` | Get profile information for a user or the authenticated account | | `update_profile` | Update the authenticated user's profile | | `follow_user` | Follow a user | | `unfollow_user` | Unfollow a user | | `list_followers` | List followers of a user or the authenticated account | | `list_following` | List accounts that a user or the authenticated account is following | | `create_list` | Create a new list | | `get_list_info` | Get information about a list | | `get_user_lists` | Get all lists owned by the authenticated user | ## Requirements - Node.js 18.x or higher - X Developer account with API keys - API v1 and v2 access ## Environment Variables | Variable | Description | |----------|-------------| | `TWITTER_API_KEY` | Your API key | | `TWITTER_API_SECRET` | Your API secret | | `TWITTER_ACCESS_TOKEN` | Your access token | | `TWITTER_ACCESS_TOKEN_SECRET` | Your access token secret | ## Repository Structure ``` twitter-mcp/ ├── .github/ │ └── workflows/ │ ├── publish.yml │ └── release.yml ├── code/ │ ├── account_management.py # Sample Python code for account management │ ├── post_tweet.py # Sample Python code for posting content │ └── retrieve_tweets.py # Sample Python code for retrieving content ├── src/ │ ├── index.ts # Main entry point │ ├── twitter-api.ts # X API client │ ├── formatter.ts # Response formatter │ └── types.ts # Type definitions ├── .env.example ├── .gitignore ├── Dockerfile ├── LICENSE ├── package.json ├── README.md └── tsconfig.json ``` ## License MIT ``` -------------------------------------------------------------------------------- /package.json: -------------------------------------------------------------------------------- ```json { "name": "twitter-mcp", "version": "1.0.0", "description": "Model Context Protocol server for Twitter integration", "main": "dist/index.js", "bin": { "twitter-mcp": "dist/index.js" }, "scripts": { "build": "tsc", "start": "node dist/index.js", "dev": "ts-node src/index.ts", "prepublishOnly": "npm run build" }, "files": [ "dist", "README.md", "LICENSE" ], "keywords": [ "mcp", "twitter", "api", "model", "context", "protocol" ], "author": "Dishant27", "license": "MIT", "dependencies": { "@modelcontextprotocol/sdk": "1.2.0", "dotenv": "^16.4.5", "twitter-api-v2": "^1.16.0" }, "devDependencies": { "ts-node": "^10.9.2", "typescript": "^5.3.3" } } ``` -------------------------------------------------------------------------------- /src/types.ts: -------------------------------------------------------------------------------- ```typescript import z from 'zod'; // Base configuration export const ConfigSchema = z.object({ apiKey: z.string(), apiSecretKey: z.string(), accessToken: z.string(), accessTokenSecret: z.string(), }); export type Config = z.infer<typeof ConfigSchema>; // Twitter API error export class TwitterError extends Error { constructor( message: string, public readonly code: number = 0, public readonly data?: any ) { super(message); this.name = 'TwitterError'; } static isRateLimit(error: TwitterError): boolean { return error.code === 88 || error.message.includes('rate limit'); } } // Schemas for tool inputs export const PostTweetSchema = z.object({ text: z.string().max(280), }); export const SearchTweetsSchema = z.object({ query: z.string(), count: z.number().min(10).max(100), }); // Account Management Schemas export const GetProfileSchema = z.object({ username: z.string().optional(), }); export const UpdateProfileSchema = z.object({ name: z.string().max(50).optional(), description: z.string().max(160).optional(), location: z.string().max(30).optional(), url: z.string().url().max(100).optional(), }).refine( data => Object.keys(data).length > 0, { message: "At least one profile field must be provided" } ); export const FollowUserSchema = z.object({ username: z.string(), }); export const UnfollowUserSchema = z.object({ username: z.string(), }); export const ListFollowersSchema = z.object({ username: z.string().optional(), count: z.number().min(1).max(200).default(20), }); export const ListFollowingSchema = z.object({ username: z.string().optional(), count: z.number().min(1).max(200).default(20), }); export const CreateListSchema = z.object({ name: z.string().max(25), description: z.string().max(100).optional(), private: z.boolean().default(false), }); export const ListInfoSchema = z.object({ listId: z.string(), }); // Types for Twitter responses export interface TwitterUser { id: string; name: string; username: string; description?: string; profileImageUrl?: string; verified: boolean; followersCount: number; followingCount: number; createdAt: string; } export interface Tweet { id: string; text: string; authorId: string; createdAt: string; publicMetrics: { retweetCount: number; replyCount: number; likeCount: number; quoteCount: number; }; } export interface TwitterList { id: string; name: string; description: string; memberCount: number; followerCount: number; private: boolean; ownerId: string; } ``` -------------------------------------------------------------------------------- /code/retrieve_tweets.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Sample code to retrieve tweets using Twitter API """ import tweepy import json def setup_twitter_client(): """ Setup Twitter API client with authentication """ # Replace these placeholder values with your actual Twitter API credentials consumer_key = "YOUR_CONSUMER_KEY" consumer_secret = "YOUR_CONSUMER_SECRET" access_token = "YOUR_ACCESS_TOKEN" access_token_secret = "YOUR_ACCESS_TOKEN_SECRET" # Authenticate to Twitter auth = tweepy.OAuth1UserHandler( consumer_key, consumer_secret, access_token, access_token_secret ) # Create API object api = tweepy.API(auth) return api def get_user_timeline(api, username, count=10): """ Retrieve recent tweets from a user's timeline Args: api: Authenticated tweepy API object username: Twitter username to retrieve tweets from count: Number of tweets to retrieve (default: 10) Returns: List of tweets """ try: tweets = api.user_timeline(screen_name=username, count=count, tweet_mode="extended") return tweets except Exception as e: print(f"Error retrieving tweets for {username}: {e}") return [] def search_tweets(api, query, count=10): """ Search for tweets matching a query Args: api: Authenticated tweepy API object query: Search query string count: Number of tweets to retrieve (default: 10) Returns: List of tweets matching the query """ try: tweets = api.search_tweets(q=query, count=count, tweet_mode="extended") return tweets except Exception as e: print(f"Error searching for tweets with query '{query}': {e}") return [] def display_tweets(tweets): """ Pretty print tweet information """ for tweet in tweets: print(f"\n{'=' * 50}") print(f"User: @{tweet.user.screen_name}") print(f"Tweet ID: {tweet.id}") print(f"Created at: {tweet.created_at}") print(f"Content: {tweet.full_text}") print(f"Retweets: {tweet.retweet_count}, Likes: {tweet.favorite_count}") def main(): # Setup Twitter client api = setup_twitter_client() # Example 1: Get tweets from a user's timeline username = "twitter" print(f"\nRetrieving recent tweets from @{username}...") user_tweets = get_user_timeline(api, username, count=5) display_tweets(user_tweets) # Example 2: Search for tweets based on a query search_query = "#Python" print(f"\nSearching for tweets with query '{search_query}'...") search_results = search_tweets(api, search_query, count=5) display_tweets(search_results) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /code/post_tweet.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Sample code to post tweets using Twitter API """ import tweepy import os from datetime import datetime def setup_twitter_client(): """ Setup Twitter API client with authentication """ # Replace these placeholder values with your actual Twitter API credentials consumer_key = "YOUR_CONSUMER_KEY" consumer_secret = "YOUR_CONSUMER_SECRET" access_token = "YOUR_ACCESS_TOKEN" access_token_secret = "YOUR_ACCESS_TOKEN_SECRET" # Authenticate to Twitter auth = tweepy.OAuth1UserHandler( consumer_key, consumer_secret, access_token, access_token_secret ) # Create API object api = tweepy.API(auth) return api def post_tweet(api, text): """ Post a simple text tweet Args: api: Authenticated tweepy API object text: Tweet content (max 280 characters) Returns: Posted tweet object if successful, None otherwise """ try: if len(text) > 280: print(f"Tweet is too long ({len(text)} characters). Maximum is 280 characters.") return None tweet = api.update_status(text) print(f"Tweet posted successfully! Tweet ID: {tweet.id}") return tweet except Exception as e: print(f"Error posting tweet: {e}") return None def post_tweet_with_media(api, text, media_path): """ Post a tweet with attached media (image, gif, or video) Args: api: Authenticated tweepy API object text: Tweet content media_path: Path to media file to upload Returns: Posted tweet object if successful, None otherwise """ try: if not os.path.exists(media_path): print(f"Media file not found: {media_path}") return None # Upload media to Twitter media = api.media_upload(media_path) # Post tweet with media tweet = api.update_status(text, media_ids=[media.media_id]) print(f"Tweet with media posted successfully! Tweet ID: {tweet.id}") return tweet except Exception as e: print(f"Error posting tweet with media: {e}") return None def reply_to_tweet(api, tweet_id, reply_text): """ Reply to an existing tweet Args: api: Authenticated tweepy API object tweet_id: ID of the tweet to reply to reply_text: Content of the reply Returns: Posted reply tweet object if successful, None otherwise """ try: reply = api.update_status( status=reply_text, in_reply_to_status_id=tweet_id, auto_populate_reply_metadata=True ) print(f"Reply posted successfully! Reply ID: {reply.id}") return reply except Exception as e: print(f"Error posting reply: {e}") return None def main(): # Setup Twitter client api = setup_twitter_client() # Example 1: Post a simple text tweet tweet_text = f"Testing the Twitter API with Python! #Python #TwitterAPI (posted at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')})" tweet = post_tweet(api, tweet_text) if tweet: # Example 2: Reply to our own tweet reply_text = "This is a reply to my previous tweet using Python!" reply_to_tweet(api, tweet.id, reply_text) # Example 3: Post a tweet with media (uncomment to test) # media_path = "path/to/your/image.jpg" # Replace with an actual path # post_tweet_with_media(api, "Check out this image! #Python", media_path) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/formatter.ts: -------------------------------------------------------------------------------- ```typescript import { Tweet, TwitterUser, TwitterList } from './types'; export class ResponseFormatter { /** * Format search response */ static formatSearchResponse( query: string, tweets: Tweet[], users: Record<string, TwitterUser> ): string { let response = `Search results for "${query}":\n\n`; if (tweets.length === 0) { return response + 'No tweets found matching your query.'; } tweets.forEach((tweet, index) => { const user = users[tweet.authorId]; response += `${index + 1}. @${user?.username || 'Unknown'} `; if (user?.verified) response += '✓ '; response += `(${user?.name || 'Unknown'}):\n`; response += `${tweet.text}\n`; response += `🔁 ${tweet.publicMetrics.retweetCount} | ❤️ ${tweet.publicMetrics.likeCount} | 💬 ${tweet.publicMetrics.replyCount}\n`; response += `🔗 https://twitter.com/${user?.username}/status/${tweet.id}\n\n`; }); return response; } /** * Format user profile */ static formatUserProfile(user: TwitterUser): string { let response = `Profile for @${user.username} `; if (user.verified) response += '✓'; response += `\n\n`; response += `Name: ${user.name}\n`; if (user.description) response += `Bio: ${user.description}\n`; response += `Followers: ${user.followersCount.toLocaleString()} | Following: ${user.followingCount.toLocaleString()}\n`; response += `Account created: ${new Date(user.createdAt).toLocaleDateString()}\n`; response += `🔗 https://twitter.com/${user.username}\n`; return response; } /** * Format users list (followers or following) */ static formatUsersList(users: TwitterUser[], listType: 'followers' | 'following'): string { const title = listType === 'followers' ? 'Followers' : 'Following'; let response = `${title} (${users.length}):\n\n`; if (users.length === 0) { return response + `No ${listType.toLowerCase()} found.`; } users.forEach((user, index) => { response += `${index + 1}. @${user.username} `; if (user.verified) response += '✓ '; response += `(${user.name})\n`; if (user.description) { // Truncate description if too long const desc = user.description.length > 50 ? user.description.substring(0, 47) + '...' : user.description; response += ` ${desc}\n`; } response += ` Followers: ${user.followersCount.toLocaleString()}\n\n`; }); return response; } /** * Format Twitter lists */ static formatLists(lists: TwitterList[]): string { let response = `Twitter Lists (${lists.length}):\n\n`; if (lists.length === 0) { return response + 'No lists found.'; } lists.forEach((list, index) => { response += `${index + 1}. ${list.name} ${list.private ? '🔒' : ''}\n`; if (list.description) response += ` ${list.description}\n`; response += ` Members: ${list.memberCount.toLocaleString()} | Followers: ${list.followerCount.toLocaleString()}\n\n`; }); return response; } /** * Format Twitter list info */ static formatListInfo(list: TwitterList): string { let response = `List: ${list.name} ${list.private ? '🔒' : ''}\n\n`; if (list.description) response += `Description: ${list.description}\n`; response += `Privacy: ${list.private ? 'Private' : 'Public'}\n`; response += `Members: ${list.memberCount.toLocaleString()}\n`; response += `Followers: ${list.followerCount.toLocaleString()}\n`; response += `🔗 https://twitter.com/i/lists/${list.id}\n`; return response; } /** * Format response for MCP */ static toMcpResponse(text: string): string { return text; } } ``` -------------------------------------------------------------------------------- /src/twitter-api.ts: -------------------------------------------------------------------------------- ```typescript import Twitter from 'twitter-api-v2'; import { Config, TwitterError, TwitterUser, Tweet, TwitterList } from './types'; export class TwitterClient { private client: Twitter; constructor(config: Config) { this.client = new Twitter({ appKey: config.apiKey, appSecret: config.apiSecretKey, accessToken: config.accessToken, accessSecret: config.accessTokenSecret, }); } /** * Post a new tweet */ async postTweet(text: string): Promise<{ id: string }> { try { const { data } = await this.client.v2.tweet(text); return { id: data.id }; } catch (error) { this.handleTwitterError(error); throw error; // TypeScript needs this } } /** * Search for tweets */ async searchTweets(query: string, count: number): Promise<{ tweets: Tweet[]; users: Record<string, TwitterUser> }> { try { const result = await this.client.v2.search(query, { max_results: count, 'tweet.fields': 'created_at,public_metrics,author_id', 'user.fields': 'profile_image_url,description,created_at,verified,public_metrics', expansions: 'author_id', }); const tweets: Tweet[] = result.data.data.map(tweet => ({ id: tweet.id, text: tweet.text, authorId: tweet.author_id, createdAt: tweet.created_at, publicMetrics: { retweetCount: tweet.public_metrics?.retweet_count || 0, replyCount: tweet.public_metrics?.reply_count || 0, likeCount: tweet.public_metrics?.like_count || 0, quoteCount: tweet.public_metrics?.quote_count || 0, }, })); const users: Record<string, TwitterUser> = {}; result.includes.users?.forEach(user => { users[user.id] = { id: user.id, name: user.name, username: user.username, description: user.description, profileImageUrl: user.profile_image_url, verified: user.verified || false, followersCount: user.public_metrics?.followers_count || 0, followingCount: user.public_metrics?.following_count || 0, createdAt: user.created_at || '', }; }); return { tweets, users }; } catch (error) { this.handleTwitterError(error); throw error; // TypeScript needs this } } /** * Get user profile information */ async getUserProfile(username?: string): Promise<TwitterUser> { try { const fields = 'profile_image_url,description,created_at,verified,public_metrics'; // Get current user profile if no username is provided const user = username ? await this.client.v2.userByUsername(username, { 'user.fields': fields }) : await this.client.v2.me({ 'user.fields': fields }); const userData = user.data; return { id: userData.id, name: userData.name, username: userData.username, description: userData.description, profileImageUrl: userData.profile_image_url, verified: userData.verified || false, followersCount: userData.public_metrics?.followers_count || 0, followingCount: userData.public_metrics?.following_count || 0, createdAt: userData.created_at || '', }; } catch (error) { this.handleTwitterError(error); throw error; } } /** * Update user profile */ async updateProfile(profileData: { name?: string; description?: string; location?: string; url?: string; }): Promise<TwitterUser> { try { const result = await this.client.v1.updateAccountProfile(profileData); return { id: result.id_str, name: result.name, username: result.screen_name, description: result.description, profileImageUrl: result.profile_image_url_https, verified: result.verified, followersCount: result.followers_count, followingCount: result.friends_count, createdAt: result.created_at, }; } catch (error) { this.handleTwitterError(error); throw error; } } /** * Follow a user */ async followUser(username: string): Promise<TwitterUser> { try { const result = await this.client.v2.follow( await this.getUserIdByUsername(username) ); if (!result.data.following) { throw new TwitterError('Failed to follow user'); } return this.getUserProfile(username); } catch (error) { this.handleTwitterError(error); throw error; } } /** * Unfollow a user */ async unfollowUser(username: string): Promise<TwitterUser> { try { const result = await this.client.v2.unfollow( await this.getUserIdByUsername(username) ); if (!result.data.following) { return this.getUserProfile(username); } else { throw new TwitterError('Failed to unfollow user'); } } catch (error) { this.handleTwitterError(error); throw error; } } /** * Get followers of a user */ async getFollowers(username?: string, count: number = 20): Promise<TwitterUser[]> { try { const userId = username ? await this.getUserIdByUsername(username) : (await this.client.v2.me()).data.id; const result = await this.client.v2.followers(userId, { max_results: count, 'user.fields': 'profile_image_url,description,created_at,verified,public_metrics', }); return result.data.map(user => ({ id: user.id, name: user.name, username: user.username, description: user.description, profileImageUrl: user.profile_image_url, verified: user.verified || false, followersCount: user.public_metrics?.followers_count || 0, followingCount: user.public_metrics?.following_count || 0, createdAt: user.created_at || '', })); } catch (error) { this.handleTwitterError(error); throw error; } } /** * Get users that a user is following */ async getFollowing(username?: string, count: number = 20): Promise<TwitterUser[]> { try { const userId = username ? await this.getUserIdByUsername(username) : (await this.client.v2.me()).data.id; const result = await this.client.v2.following(userId, { max_results: count, 'user.fields': 'profile_image_url,description,created_at,verified,public_metrics', }); return result.data.map(user => ({ id: user.id, name: user.name, username: user.username, description: user.description, profileImageUrl: user.profile_image_url, verified: user.verified || false, followersCount: user.public_metrics?.followers_count || 0, followingCount: user.public_metrics?.following_count || 0, createdAt: user.created_at || '', })); } catch (error) { this.handleTwitterError(error); throw error; } } /** * Create a Twitter list */ async createList(name: string, description?: string, isPrivate: boolean = false): Promise<TwitterList> { try { const result = await this.client.v2.createList({ name, description, private: isPrivate, }); return { id: result.data.id, name: result.data.name, description: result.data.description || '', memberCount: 0, followerCount: 0, private: result.data.private || false, ownerId: await this.getCurrentUserId(), }; } catch (error) { this.handleTwitterError(error); throw error; } } /** * Get list information */ async getListInfo(listId: string): Promise<TwitterList> { try { const result = await this.client.v2.list(listId, { 'list.fields': 'follower_count,member_count,owner_id,private', }); return { id: result.data.id, name: result.data.name, description: result.data.description || '', memberCount: result.data.member_count || 0, followerCount: result.data.follower_count || 0, private: result.data.private || false, ownerId: result.data.owner_id || '', }; } catch (error) { this.handleTwitterError(error); throw error; } } /** * Get user lists */ async getUserLists(): Promise<TwitterList[]> { try { const userId = await this.getCurrentUserId(); const result = await this.client.v2.listsOwned(userId, { 'list.fields': 'follower_count,member_count,owner_id,private', }); return result.data.map(list => ({ id: list.id, name: list.name, description: list.description || '', memberCount: list.member_count || 0, followerCount: list.follower_count || 0, private: list.private || false, ownerId: list.owner_id || userId, })); } catch (error) { this.handleTwitterError(error); throw error; } } /** * Helper: Get user ID by username */ private async getUserIdByUsername(username: string): Promise<string> { try { const result = await this.client.v2.userByUsername(username); return result.data.id; } catch (error) { this.handleTwitterError(error); throw error; } } /** * Helper: Get current user ID */ private async getCurrentUserId(): Promise<string> { try { const result = await this.client.v2.me(); return result.data.id; } catch (error) { this.handleTwitterError(error); throw error; } } /** * Error handler */ private handleTwitterError(error: any): never { console.error('Twitter API error:', error); // Handle rate limiting if (error.code === 88 || (error.errors && error.errors[0]?.code === 88)) { throw new TwitterError('Twitter rate limit exceeded', 88, error); } // Handle auth errors if ([32, 89, 135, 215, 226].includes(error.code) || (error.errors && [32, 89, 135, 215, 226].includes(error.errors[0]?.code))) { throw new TwitterError('Twitter authentication error', error.code || 0, error); } // For all other errors const message = error.message || (error.errors && error.errors[0]?.message) || 'Unknown Twitter API error'; const code = error.code || (error.errors && error.errors[0]?.code) || 0; throw new TwitterError(message, code, error); } } ``` -------------------------------------------------------------------------------- /code/account_management.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Sample code for Twitter account management operations """ import tweepy import os from PIL import Image import io def setup_twitter_client(): """ Setup Twitter API client with authentication """ # Replace these placeholder values with your actual Twitter API credentials consumer_key = "YOUR_CONSUMER_KEY" consumer_secret = "YOUR_CONSUMER_SECRET" access_token = "YOUR_ACCESS_TOKEN" access_token_secret = "YOUR_ACCESS_TOKEN_SECRET" # Authenticate to Twitter auth = tweepy.OAuth1UserHandler( consumer_key, consumer_secret, access_token, access_token_secret ) # Create API object api = tweepy.API(auth) return api def get_account_info(api): """ Get information about the authenticated user's account Args: api: Authenticated tweepy API object Returns: User object containing account information """ try: me = api.verify_credentials() print(f"Account information for @{me.screen_name}:") print(f" - Display name: {me.name}") print(f" - Bio: {me.description}") print(f" - Location: {me.location}") print(f" - Following: {me.friends_count}, Followers: {me.followers_count}") print(f" - Tweets: {me.statuses_count}") print(f" - Account created: {me.created_at}") return me except Exception as e: print(f"Error getting account information: {e}") return None def update_profile(api, **kwargs): """ Update the authenticated user's profile information Args: api: Authenticated tweepy API object kwargs: Optional fields to update (name, description, location, url) Returns: Updated user object if successful, None otherwise """ try: # Only pass parameters that are provided update_params = {} if 'name' in kwargs: update_params['name'] = kwargs['name'] if 'description' in kwargs: update_params['description'] = kwargs['description'] if 'location' in kwargs: update_params['location'] = kwargs['location'] if 'url' in kwargs: update_params['url'] = kwargs['url'] # Update profile updated_user = api.update_profile(**update_params) print(f"Profile updated successfully for @{updated_user.screen_name}") return updated_user except Exception as e: print(f"Error updating profile: {e}") return None def update_profile_image(api, image_path): """ Update the authenticated user's profile image Args: api: Authenticated tweepy API object image_path: Path to the new profile image file Returns: Updated user object if successful, None otherwise """ try: if not os.path.exists(image_path): print(f"Image file not found: {image_path}") return None # Check image size and format with Image.open(image_path) as img: width, height = img.size print(f"Image dimensions: {width}x{height}") # Twitter recommends 400x400 pixels for profile images if width < 400 or height < 400: print("Warning: Twitter recommends profile images of at least 400x400 pixels") # Update profile image updated_user = api.update_profile_image(filename=image_path) print(f"Profile image updated successfully for @{updated_user.screen_name}") return updated_user except Exception as e: print(f"Error updating profile image: {e}") return None def update_profile_banner(api, banner_path): """ Update the authenticated user's profile banner Args: api: Authenticated tweepy API object banner_path: Path to the new banner image file Returns: True if successful, False otherwise """ try: if not os.path.exists(banner_path): print(f"Banner file not found: {banner_path}") return False # Check image size and format with Image.open(banner_path) as img: width, height = img.size print(f"Banner dimensions: {width}x{height}") # Twitter recommends 1500x500 pixels for banners if width < 1500 or height < 500: print("Warning: Twitter recommends banner images of 1500x500 pixels") # Update profile banner api.update_profile_banner(filename=banner_path) print("Profile banner updated successfully") return True except Exception as e: print(f"Error updating profile banner: {e}") return False def get_followers(api, count=20): """ Get a list of users following the authenticated user Args: api: Authenticated tweepy API object count: Number of followers to retrieve (default: 20) Returns: List of follower user objects """ try: followers = api.get_followers(count=count) print(f"Retrieved {len(followers)} followers:") for i, follower in enumerate(followers, 1): print(f" {i}. @{follower.screen_name} - {follower.name}") return followers except Exception as e: print(f"Error retrieving followers: {e}") return [] def get_following(api, count=20): """ Get a list of users that the authenticated user is following Args: api: Authenticated tweepy API object count: Number of following users to retrieve (default: 20) Returns: List of following user objects """ try: following = api.get_friends(count=count) print(f"Retrieved {len(following)} accounts you are following:") for i, friend in enumerate(following, 1): print(f" {i}. @{friend.screen_name} - {friend.name}") return following except Exception as e: print(f"Error retrieving following accounts: {e}") return [] def follow_user(api, username): """ Follow a specified user Args: api: Authenticated tweepy API object username: Screen name of the user to follow Returns: Followed user object if successful, None otherwise """ try: user = api.create_friendship(screen_name=username) print(f"Successfully followed @{user.screen_name}") return user except Exception as e: print(f"Error following user @{username}: {e}") return None def unfollow_user(api, username): """ Unfollow a specified user Args: api: Authenticated tweepy API object username: Screen name of the user to unfollow Returns: Unfollowed user object if successful, None otherwise """ try: user = api.destroy_friendship(screen_name=username) print(f"Successfully unfollowed @{user.screen_name}") return user except Exception as e: print(f"Error unfollowing user @{username}: {e}") return None def create_list(api, name, description, private=False): """ Create a new Twitter list Args: api: Authenticated tweepy API object name: Name of the list description: Description of the list private: Whether the list should be private (default: False) Returns: Created list object if successful, None otherwise """ try: new_list = api.create_list(name=name, description=description, mode='private' if private else 'public') print(f"List '{new_list.name}' created successfully") return new_list except Exception as e: print(f"Error creating list: {e}") return None def get_lists(api): """ Get all lists owned by the authenticated user Args: api: Authenticated tweepy API object Returns: List of owned lists """ try: owned_lists = api.get_lists() print(f"Retrieved {len(owned_lists)} lists:") for i, lst in enumerate(owned_lists, 1): print(f" {i}. {lst.name} - {lst.description} ({lst.member_count} members)") return owned_lists except Exception as e: print(f"Error retrieving lists: {e}") return [] def main(): # Setup Twitter client api = setup_twitter_client() # Example 1: Get account information print("\n=== Account Information ===") account_info = get_account_info(api) # Example 2: Update profile information print("\n=== Update Profile Information ===") print("Note: Commented out to prevent actual updates") # update_profile( # api, # name="Updated Name", # description="This is an updated bio using Python Tweepy!", # location="San Francisco, CA", # url="https://example.com" # ) # Example 3: Update profile image print("\n=== Update Profile Image ===") print("Note: Commented out to prevent actual updates") # profile_image_path = "path/to/profile/image.jpg" # Replace with actual path # update_profile_image(api, profile_image_path) # Example 4: Update profile banner print("\n=== Update Profile Banner ===") print("Note: Commented out to prevent actual updates") # banner_image_path = "path/to/banner/image.jpg" # Replace with actual path # update_profile_banner(api, banner_image_path) # Example 5: Get followers print("\n=== Get Followers ===") followers = get_followers(api, count=5) # Limit to 5 for example # Example 6: Get accounts you're following print("\n=== Get Following ===") following = get_following(api, count=5) # Limit to 5 for example # Example 7: Follow a user print("\n=== Follow User ===") print("Note: Commented out to prevent actual follow") # follow_user(api, "twitter") # Example 8: Unfollow a user print("\n=== Unfollow User ===") print("Note: Commented out to prevent actual unfollow") # unfollow_user(api, "twitter") # Example 9: Create a Twitter list print("\n=== Create Twitter List ===") print("Note: Commented out to prevent actual list creation") # create_list(api, "Python Developers", "A list of Python developers and organizations", private=False) # Example 10: Get all owned lists print("\n=== Get Owned Lists ===") owned_lists = get_lists(api) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/index.ts: -------------------------------------------------------------------------------- ```typescript #!/usr/bin/env node import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { ListToolsRequestSchema, CallToolRequestSchema, Tool, ErrorCode, McpError, TextContent } from '@modelcontextprotocol/sdk/types.js'; import { TwitterClient } from './twitter-api.js'; import { ResponseFormatter } from './formatter.js'; import { Config, ConfigSchema, PostTweetSchema, SearchTweetsSchema, GetProfileSchema, UpdateProfileSchema, FollowUserSchema, UnfollowUserSchema, ListFollowersSchema, ListFollowingSchema, CreateListSchema, ListInfoSchema, TwitterError } from './types.js'; import dotenv from 'dotenv'; export class TwitterServer { private server: Server; private client: TwitterClient; constructor(config: Config) { // Validate config const result = ConfigSchema.safeParse(config); if (!result.success) { throw new Error(`Invalid configuration: ${result.error.message}`); } this.client = new TwitterClient(config); this.server = new Server({ name: 'twitter-mcp', version: '1.0.0' }, { capabilities: { tools: {} } }); this.setupHandlers(); } private setupHandlers(): void { // Error handler this.server.onerror = (error) => { console.error('[MCP Error]:', error); }; // Graceful shutdown process.on('SIGINT', async () => { console.error('Shutting down server...'); await this.server.close(); process.exit(0); }); // Register tool handlers this.setupToolHandlers(); } private setupToolHandlers(): void { // List available tools this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ // Tweet operations { name: 'post_tweet', description: 'Post a new tweet to Twitter', inputSchema: { type: 'object', properties: { text: { type: 'string', description: 'The content of your tweet', maxLength: 280 } }, required: ['text'] } } as Tool, { name: 'search_tweets', description: 'Search for tweets on Twitter', inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'Search query' }, count: { type: 'number', description: 'Number of tweets to return (10-100)', minimum: 10, maximum: 100 } }, required: ['query', 'count'] } } as Tool, // Account management operations { name: 'get_profile', description: 'Get Twitter profile information for a user or the authenticated account', inputSchema: { type: 'object', properties: { username: { type: 'string', description: 'Twitter username (if not provided, returns authenticated user profile)' } }, required: [] } } as Tool, { name: 'update_profile', description: 'Update the authenticated user\'s Twitter profile', inputSchema: { type: 'object', properties: { name: { type: 'string', description: 'Display name (max 50 chars)' }, description: { type: 'string', description: 'Bio (max 160 chars)' }, location: { type: 'string', description: 'Location (max 30 chars)' }, url: { type: 'string', description: 'Website URL (max 100 chars)' } }, required: [] } } as Tool, { name: 'follow_user', description: 'Follow a Twitter user', inputSchema: { type: 'object', properties: { username: { type: 'string', description: 'Twitter username to follow' } }, required: ['username'] } } as Tool, { name: 'unfollow_user', description: 'Unfollow a Twitter user', inputSchema: { type: 'object', properties: { username: { type: 'string', description: 'Twitter username to unfollow' } }, required: ['username'] } } as Tool, { name: 'list_followers', description: 'List followers of a Twitter user or the authenticated account', inputSchema: { type: 'object', properties: { username: { type: 'string', description: 'Twitter username (if not provided, returns authenticated user\'s followers)' }, count: { type: 'number', description: 'Number of followers to return (1-200)', minimum: 1, maximum: 200, default: 20 } }, required: [] } } as Tool, { name: 'list_following', description: 'List accounts that a Twitter user or the authenticated account is following', inputSchema: { type: 'object', properties: { username: { type: 'string', description: 'Twitter username (if not provided, returns authenticated user\'s following)' }, count: { type: 'number', description: 'Number of accounts to return (1-200)', minimum: 1, maximum: 200, default: 20 } }, required: [] } } as Tool, { name: 'create_list', description: 'Create a new Twitter list', inputSchema: { type: 'object', properties: { name: { type: 'string', description: 'List name (max 25 chars)' }, description: { type: 'string', description: 'List description (max 100 chars)' }, private: { type: 'boolean', description: 'Whether the list should be private (default: false)' } }, required: ['name'] } } as Tool, { name: 'get_list_info', description: 'Get information about a Twitter list', inputSchema: { type: 'object', properties: { listId: { type: 'string', description: 'Twitter list ID' } }, required: ['listId'] } } as Tool, { name: 'get_user_lists', description: 'Get all lists owned by the authenticated user', inputSchema: { type: 'object', properties: {}, required: [] } } as Tool ] })); // Handle tool execution this.server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; console.error(`Tool called: ${name}`, args); try { switch (name) { // Tweet operations case 'post_tweet': return await this.handlePostTweet(args); case 'search_tweets': return await this.handleSearchTweets(args); // Account management operations case 'get_profile': return await this.handleGetProfile(args); case 'update_profile': return await this.handleUpdateProfile(args); case 'follow_user': return await this.handleFollowUser(args); case 'unfollow_user': return await this.handleUnfollowUser(args); case 'list_followers': return await this.handleListFollowers(args); case 'list_following': return await this.handleListFollowing(args); case 'create_list': return await this.handleCreateList(args); case 'get_list_info': return await this.handleGetListInfo(args); case 'get_user_lists': return await this.handleGetUserLists(args); default: throw new McpError( ErrorCode.MethodNotFound, `Unknown tool: ${name}` ); } } catch (error) { return this.handleError(error); } }); } // Tweet operations handlers private async handlePostTweet(args: unknown) { const result = PostTweetSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const tweet = await this.client.postTweet(result.data.text); return { content: [{ type: 'text', text: `Tweet posted successfully!\nURL: https://twitter.com/status/${tweet.id}` }] as TextContent[] }; } private async handleSearchTweets(args: unknown) { const result = SearchTweetsSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const { tweets, users } = await this.client.searchTweets( result.data.query, result.data.count ); const formattedResponse = ResponseFormatter.formatSearchResponse( result.data.query, tweets, users ); return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } // Account management operations handlers private async handleGetProfile(args: unknown) { const result = GetProfileSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const profile = await this.client.getUserProfile(result.data.username); const formattedResponse = ResponseFormatter.formatUserProfile(profile); return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleUpdateProfile(args: unknown) { const result = UpdateProfileSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const updatedProfile = await this.client.updateProfile({ name: result.data.name, description: result.data.description, location: result.data.location, url: result.data.url }); const formattedResponse = `Profile updated successfully!\n\n${ResponseFormatter.formatUserProfile(updatedProfile)}`; return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleFollowUser(args: unknown) { const result = FollowUserSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const user = await this.client.followUser(result.data.username); const formattedResponse = `Successfully followed @${user.username}!\n\n${ResponseFormatter.formatUserProfile(user)}`; return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleUnfollowUser(args: unknown) { const result = UnfollowUserSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const user = await this.client.unfollowUser(result.data.username); const formattedResponse = `Successfully unfollowed @${user.username}!\n\n${ResponseFormatter.formatUserProfile(user)}`; return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleListFollowers(args: unknown) { const result = ListFollowersSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const followers = await this.client.getFollowers( result.data.username, result.data.count ); const formattedResponse = ResponseFormatter.formatUsersList(followers, 'followers'); return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleListFollowing(args: unknown) { const result = ListFollowingSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const following = await this.client.getFollowing( result.data.username, result.data.count ); const formattedResponse = ResponseFormatter.formatUsersList(following, 'following'); return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleCreateList(args: unknown) { const result = CreateListSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const list = await this.client.createList( result.data.name, result.data.description, result.data.private ); const formattedResponse = `List "${list.name}" created successfully!\n\n${ResponseFormatter.formatListInfo(list)}`; return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleGetListInfo(args: unknown) { const result = ListInfoSchema.safeParse(args); if (!result.success) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${result.error.message}` ); } const list = await this.client.getListInfo(result.data.listId); const formattedResponse = ResponseFormatter.formatListInfo(list); return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private async handleGetUserLists(args: unknown) { // No parameters needed for this endpoint const lists = await this.client.getUserLists(); const formattedResponse = ResponseFormatter.formatLists(lists); return { content: [{ type: 'text', text: ResponseFormatter.toMcpResponse(formattedResponse) }] as TextContent[] }; } private handleError(error: unknown) { if (error instanceof McpError) { throw error; } if (error instanceof TwitterError) { if (TwitterError.isRateLimit(error)) { return { content: [{ type: 'text', text: 'Rate limit exceeded. Please wait a moment before trying again.', isError: true }] as TextContent[] }; } return { content: [{ type: 'text', text: `Twitter API error: ${(error as TwitterError).message}`, isError: true }] as TextContent[] }; } console.error('Unexpected error:', error); throw new McpError( ErrorCode.InternalError, 'An unexpected error occurred' ); } async start(): Promise<void> { const transport = new StdioServerTransport(); await this.server.connect(transport); console.error('Twitter MCP server running on stdio'); } } // Start the server dotenv.config(); const config = { apiKey: process.env.TWITTER_API_KEY!, apiSecretKey: process.env.TWITTER_API_SECRET!, accessToken: process.env.TWITTER_ACCESS_TOKEN!, accessTokenSecret: process.env.TWITTER_ACCESS_TOKEN_SECRET! }; const server = new TwitterServer(config); server.start().catch(error => { console.error('Failed to start server:', error); process.exit(1); }); ```