# Directory Structure
```
├── .gitignore
├── code
│ ├── account_management.py
│ ├── post_tweet.py
│ └── retrieve_tweets.py
├── LICENSE
├── package.json
├── README.md
└── src
├── formatter.ts
├── index.ts
├── twitter-api.ts
└── types.ts
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Dependencies
node_modules/
# Environment variables
.env
# Build output
dist/
# Log files
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Editor directories and files
.idea/
.vscode/
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
# OS generated files
.DS_Store
Thumbbs.db
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# <img src="https://www.x.com/favicon.ico" alt="X Logo" width="32" height="32"> X (Twitter) MCP Server
This MCP server allows Clients to interact with X (formerly Twitter), enabling comprehensive platform operations including posting tweets, searching content, managing accounts, and organizing lists.
## Quick Start
1. Create an X Developer account and get your API keys from [X Developer Portal](https://developer.twitter.com/en/portal/dashboard)
2. Set all required API keys in the environment variables
3. Clone this repository: `git clone https://github.com/Dishant27/twitter-mcp.git`
4. Install dependencies: `npm install`
5. Run the server:
- With environment variables:
```bash
TWITTER_API_KEY=your_api_key \
TWITTER_API_SECRET=your_api_secret \
TWITTER_ACCESS_TOKEN=your_access_token \
TWITTER_ACCESS_TOKEN_SECRET=your_access_token_secret \
npm start
```
- Using a `.env` file:
```bash
# Create a .env file with your X API keys
echo "TWITTER_API_KEY=your_api_key
TWITTER_API_SECRET=your_api_secret
TWITTER_ACCESS_TOKEN=your_access_token
TWITTER_ACCESS_TOKEN_SECRET=your_access_token_secret" > .env
# Start the server
npm start
```
6. Use with a MCP client, such as Claude.
## Claude Configuration
To use this server with Claude, you'll need to set up the MCP configuration. Here's an example of how the configuration structure should look:
```json
{
"name": "x",
"display_name": "X",
"description": "X MCP allows Claude to interact with X (formerly Twitter)",
"path": "path/to/twitter-mcp/dist/index.js",
"startup": {
"env": {
"TWITTER_API_KEY": "your_api_key",
"TWITTER_API_SECRET": "your_api_secret",
"TWITTER_ACCESS_TOKEN": "your_access_token",
"TWITTER_ACCESS_TOKEN_SECRET": "your_access_token_secret"
}
},
"transport": "stdio"
}
```
Save this configuration in your Claude MCP config directory, typically located at:
- Windows: `%APPDATA%\AnthropicClaude\mcp-servers`
- macOS: `~/Library/Application Support/AnthropicClaude/mcp-servers`
- Linux: `~/.config/AnthropicClaude/mcp-servers`
## Features
### Post Operations
- Post content (up to 280 characters)
- Search for posts by query with customizable result count
### Account Management
- Get profile information for any user or the authenticated account
- Update profile details (name, bio, location, website URL)
- Follow and unfollow users
- List followers for any user or the authenticated account
- List accounts that a user is following
### List Management
- Create new lists (public or private)
- Get information about specific lists
- Retrieve all lists owned by the authenticated user
## Available MCP Tools
| Tool Name | Description |
|-----------|-------------|
| `post_tweet` | Post new content to X |
| `search_tweets` | Search for content on X |
| `get_profile` | Get profile information for a user or the authenticated account |
| `update_profile` | Update the authenticated user's profile |
| `follow_user` | Follow a user |
| `unfollow_user` | Unfollow a user |
| `list_followers` | List followers of a user or the authenticated account |
| `list_following` | List accounts that a user or the authenticated account is following |
| `create_list` | Create a new list |
| `get_list_info` | Get information about a list |
| `get_user_lists` | Get all lists owned by the authenticated user |
## Requirements
- Node.js 18.x or higher
- X Developer account with API keys
- API v1 and v2 access
## Environment Variables
| Variable | Description |
|----------|-------------|
| `TWITTER_API_KEY` | Your API key |
| `TWITTER_API_SECRET` | Your API secret |
| `TWITTER_ACCESS_TOKEN` | Your access token |
| `TWITTER_ACCESS_TOKEN_SECRET` | Your access token secret |
## Repository Structure
```
twitter-mcp/
├── .github/
│ └── workflows/
│ ├── publish.yml
│ └── release.yml
├── code/
│ ├── account_management.py # Sample Python code for account management
│ ├── post_tweet.py # Sample Python code for posting content
│ └── retrieve_tweets.py # Sample Python code for retrieving content
├── src/
│ ├── index.ts # Main entry point
│ ├── twitter-api.ts # X API client
│ ├── formatter.ts # Response formatter
│ └── types.ts # Type definitions
├── .env.example
├── .gitignore
├── Dockerfile
├── LICENSE
├── package.json
├── README.md
└── tsconfig.json
```
## License
MIT
```
--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "twitter-mcp",
"version": "1.0.0",
"description": "Model Context Protocol server for Twitter integration",
"main": "dist/index.js",
"bin": {
"twitter-mcp": "dist/index.js"
},
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"dev": "ts-node src/index.ts",
"prepublishOnly": "npm run build"
},
"files": [
"dist",
"README.md",
"LICENSE"
],
"keywords": [
"mcp",
"twitter",
"api",
"model",
"context",
"protocol"
],
"author": "Dishant27",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "1.2.0",
"dotenv": "^16.4.5",
"twitter-api-v2": "^1.16.0"
},
"devDependencies": {
"ts-node": "^10.9.2",
"typescript": "^5.3.3"
}
}
```
--------------------------------------------------------------------------------
/src/types.ts:
--------------------------------------------------------------------------------
```typescript
import z from 'zod';
// Base configuration
export const ConfigSchema = z.object({
apiKey: z.string(),
apiSecretKey: z.string(),
accessToken: z.string(),
accessTokenSecret: z.string(),
});
export type Config = z.infer<typeof ConfigSchema>;
// Twitter API error
export class TwitterError extends Error {
constructor(
message: string,
public readonly code: number = 0,
public readonly data?: any
) {
super(message);
this.name = 'TwitterError';
}
static isRateLimit(error: TwitterError): boolean {
return error.code === 88 || error.message.includes('rate limit');
}
}
// Schemas for tool inputs
export const PostTweetSchema = z.object({
text: z.string().max(280),
});
export const SearchTweetsSchema = z.object({
query: z.string(),
count: z.number().min(10).max(100),
});
// Account Management Schemas
export const GetProfileSchema = z.object({
username: z.string().optional(),
});
export const UpdateProfileSchema = z.object({
name: z.string().max(50).optional(),
description: z.string().max(160).optional(),
location: z.string().max(30).optional(),
url: z.string().url().max(100).optional(),
}).refine(
data => Object.keys(data).length > 0,
{ message: "At least one profile field must be provided" }
);
export const FollowUserSchema = z.object({
username: z.string(),
});
export const UnfollowUserSchema = z.object({
username: z.string(),
});
export const ListFollowersSchema = z.object({
username: z.string().optional(),
count: z.number().min(1).max(200).default(20),
});
export const ListFollowingSchema = z.object({
username: z.string().optional(),
count: z.number().min(1).max(200).default(20),
});
export const CreateListSchema = z.object({
name: z.string().max(25),
description: z.string().max(100).optional(),
private: z.boolean().default(false),
});
export const ListInfoSchema = z.object({
listId: z.string(),
});
// Types for Twitter responses
export interface TwitterUser {
id: string;
name: string;
username: string;
description?: string;
profileImageUrl?: string;
verified: boolean;
followersCount: number;
followingCount: number;
createdAt: string;
}
export interface Tweet {
id: string;
text: string;
authorId: string;
createdAt: string;
publicMetrics: {
retweetCount: number;
replyCount: number;
likeCount: number;
quoteCount: number;
};
}
export interface TwitterList {
id: string;
name: string;
description: string;
memberCount: number;
followerCount: number;
private: boolean;
ownerId: string;
}
```
--------------------------------------------------------------------------------
/code/retrieve_tweets.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Sample code to retrieve tweets using Twitter API
"""
import tweepy
import json
def setup_twitter_client():
"""
Setup Twitter API client with authentication
"""
# Replace these placeholder values with your actual Twitter API credentials
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
# Authenticate to Twitter
auth = tweepy.OAuth1UserHandler(
consumer_key, consumer_secret, access_token, access_token_secret
)
# Create API object
api = tweepy.API(auth)
return api
def get_user_timeline(api, username, count=10):
"""
Retrieve recent tweets from a user's timeline
Args:
api: Authenticated tweepy API object
username: Twitter username to retrieve tweets from
count: Number of tweets to retrieve (default: 10)
Returns:
List of tweets
"""
try:
tweets = api.user_timeline(screen_name=username, count=count, tweet_mode="extended")
return tweets
except Exception as e:
print(f"Error retrieving tweets for {username}: {e}")
return []
def search_tweets(api, query, count=10):
"""
Search for tweets matching a query
Args:
api: Authenticated tweepy API object
query: Search query string
count: Number of tweets to retrieve (default: 10)
Returns:
List of tweets matching the query
"""
try:
tweets = api.search_tweets(q=query, count=count, tweet_mode="extended")
return tweets
except Exception as e:
print(f"Error searching for tweets with query '{query}': {e}")
return []
def display_tweets(tweets):
"""
Pretty print tweet information
"""
for tweet in tweets:
print(f"\n{'=' * 50}")
print(f"User: @{tweet.user.screen_name}")
print(f"Tweet ID: {tweet.id}")
print(f"Created at: {tweet.created_at}")
print(f"Content: {tweet.full_text}")
print(f"Retweets: {tweet.retweet_count}, Likes: {tweet.favorite_count}")
def main():
# Setup Twitter client
api = setup_twitter_client()
# Example 1: Get tweets from a user's timeline
username = "twitter"
print(f"\nRetrieving recent tweets from @{username}...")
user_tweets = get_user_timeline(api, username, count=5)
display_tweets(user_tweets)
# Example 2: Search for tweets based on a query
search_query = "#Python"
print(f"\nSearching for tweets with query '{search_query}'...")
search_results = search_tweets(api, search_query, count=5)
display_tweets(search_results)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/code/post_tweet.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Sample code to post tweets using Twitter API
"""
import tweepy
import os
from datetime import datetime
def setup_twitter_client():
"""
Setup Twitter API client with authentication
"""
# Replace these placeholder values with your actual Twitter API credentials
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
# Authenticate to Twitter
auth = tweepy.OAuth1UserHandler(
consumer_key, consumer_secret, access_token, access_token_secret
)
# Create API object
api = tweepy.API(auth)
return api
def post_tweet(api, text):
"""
Post a simple text tweet
Args:
api: Authenticated tweepy API object
text: Tweet content (max 280 characters)
Returns:
Posted tweet object if successful, None otherwise
"""
try:
if len(text) > 280:
print(f"Tweet is too long ({len(text)} characters). Maximum is 280 characters.")
return None
tweet = api.update_status(text)
print(f"Tweet posted successfully! Tweet ID: {tweet.id}")
return tweet
except Exception as e:
print(f"Error posting tweet: {e}")
return None
def post_tweet_with_media(api, text, media_path):
"""
Post a tweet with attached media (image, gif, or video)
Args:
api: Authenticated tweepy API object
text: Tweet content
media_path: Path to media file to upload
Returns:
Posted tweet object if successful, None otherwise
"""
try:
if not os.path.exists(media_path):
print(f"Media file not found: {media_path}")
return None
# Upload media to Twitter
media = api.media_upload(media_path)
# Post tweet with media
tweet = api.update_status(text, media_ids=[media.media_id])
print(f"Tweet with media posted successfully! Tweet ID: {tweet.id}")
return tweet
except Exception as e:
print(f"Error posting tweet with media: {e}")
return None
def reply_to_tweet(api, tweet_id, reply_text):
"""
Reply to an existing tweet
Args:
api: Authenticated tweepy API object
tweet_id: ID of the tweet to reply to
reply_text: Content of the reply
Returns:
Posted reply tweet object if successful, None otherwise
"""
try:
reply = api.update_status(
status=reply_text,
in_reply_to_status_id=tweet_id,
auto_populate_reply_metadata=True
)
print(f"Reply posted successfully! Reply ID: {reply.id}")
return reply
except Exception as e:
print(f"Error posting reply: {e}")
return None
def main():
# Setup Twitter client
api = setup_twitter_client()
# Example 1: Post a simple text tweet
tweet_text = f"Testing the Twitter API with Python! #Python #TwitterAPI (posted at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')})"
tweet = post_tweet(api, tweet_text)
if tweet:
# Example 2: Reply to our own tweet
reply_text = "This is a reply to my previous tweet using Python!"
reply_to_tweet(api, tweet.id, reply_text)
# Example 3: Post a tweet with media (uncomment to test)
# media_path = "path/to/your/image.jpg" # Replace with an actual path
# post_tweet_with_media(api, "Check out this image! #Python", media_path)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/formatter.ts:
--------------------------------------------------------------------------------
```typescript
import { Tweet, TwitterUser, TwitterList } from './types';
export class ResponseFormatter {
/**
* Format search response
*/
static formatSearchResponse(
query: string,
tweets: Tweet[],
users: Record<string, TwitterUser>
): string {
let response = `Search results for "${query}":\n\n`;
if (tweets.length === 0) {
return response + 'No tweets found matching your query.';
}
tweets.forEach((tweet, index) => {
const user = users[tweet.authorId];
response += `${index + 1}. @${user?.username || 'Unknown'} `;
if (user?.verified) response += '✓ ';
response += `(${user?.name || 'Unknown'}):\n`;
response += `${tweet.text}\n`;
response += `🔁 ${tweet.publicMetrics.retweetCount} | ❤️ ${tweet.publicMetrics.likeCount} | 💬 ${tweet.publicMetrics.replyCount}\n`;
response += `🔗 https://twitter.com/${user?.username}/status/${tweet.id}\n\n`;
});
return response;
}
/**
* Format user profile
*/
static formatUserProfile(user: TwitterUser): string {
let response = `Profile for @${user.username} `;
if (user.verified) response += '✓';
response += `\n\n`;
response += `Name: ${user.name}\n`;
if (user.description) response += `Bio: ${user.description}\n`;
response += `Followers: ${user.followersCount.toLocaleString()} | Following: ${user.followingCount.toLocaleString()}\n`;
response += `Account created: ${new Date(user.createdAt).toLocaleDateString()}\n`;
response += `🔗 https://twitter.com/${user.username}\n`;
return response;
}
/**
* Format users list (followers or following)
*/
static formatUsersList(users: TwitterUser[], listType: 'followers' | 'following'): string {
const title = listType === 'followers' ? 'Followers' : 'Following';
let response = `${title} (${users.length}):\n\n`;
if (users.length === 0) {
return response + `No ${listType.toLowerCase()} found.`;
}
users.forEach((user, index) => {
response += `${index + 1}. @${user.username} `;
if (user.verified) response += '✓ ';
response += `(${user.name})\n`;
if (user.description) {
// Truncate description if too long
const desc = user.description.length > 50
? user.description.substring(0, 47) + '...'
: user.description;
response += ` ${desc}\n`;
}
response += ` Followers: ${user.followersCount.toLocaleString()}\n\n`;
});
return response;
}
/**
* Format Twitter lists
*/
static formatLists(lists: TwitterList[]): string {
let response = `Twitter Lists (${lists.length}):\n\n`;
if (lists.length === 0) {
return response + 'No lists found.';
}
lists.forEach((list, index) => {
response += `${index + 1}. ${list.name} ${list.private ? '🔒' : ''}\n`;
if (list.description) response += ` ${list.description}\n`;
response += ` Members: ${list.memberCount.toLocaleString()} | Followers: ${list.followerCount.toLocaleString()}\n\n`;
});
return response;
}
/**
* Format Twitter list info
*/
static formatListInfo(list: TwitterList): string {
let response = `List: ${list.name} ${list.private ? '🔒' : ''}\n\n`;
if (list.description) response += `Description: ${list.description}\n`;
response += `Privacy: ${list.private ? 'Private' : 'Public'}\n`;
response += `Members: ${list.memberCount.toLocaleString()}\n`;
response += `Followers: ${list.followerCount.toLocaleString()}\n`;
response += `🔗 https://twitter.com/i/lists/${list.id}\n`;
return response;
}
/**
* Format response for MCP
*/
static toMcpResponse(text: string): string {
return text;
}
}
```
--------------------------------------------------------------------------------
/src/twitter-api.ts:
--------------------------------------------------------------------------------
```typescript
import Twitter from 'twitter-api-v2';
import { Config, TwitterError, TwitterUser, Tweet, TwitterList } from './types';
export class TwitterClient {
private client: Twitter;
constructor(config: Config) {
this.client = new Twitter({
appKey: config.apiKey,
appSecret: config.apiSecretKey,
accessToken: config.accessToken,
accessSecret: config.accessTokenSecret,
});
}
/**
* Post a new tweet
*/
async postTweet(text: string): Promise<{ id: string }> {
try {
const { data } = await this.client.v2.tweet(text);
return { id: data.id };
} catch (error) {
this.handleTwitterError(error);
throw error; // TypeScript needs this
}
}
/**
* Search for tweets
*/
async searchTweets(query: string, count: number): Promise<{ tweets: Tweet[]; users: Record<string, TwitterUser> }> {
try {
const result = await this.client.v2.search(query, {
max_results: count,
'tweet.fields': 'created_at,public_metrics,author_id',
'user.fields': 'profile_image_url,description,created_at,verified,public_metrics',
expansions: 'author_id',
});
const tweets: Tweet[] = result.data.data.map(tweet => ({
id: tweet.id,
text: tweet.text,
authorId: tweet.author_id,
createdAt: tweet.created_at,
publicMetrics: {
retweetCount: tweet.public_metrics?.retweet_count || 0,
replyCount: tweet.public_metrics?.reply_count || 0,
likeCount: tweet.public_metrics?.like_count || 0,
quoteCount: tweet.public_metrics?.quote_count || 0,
},
}));
const users: Record<string, TwitterUser> = {};
result.includes.users?.forEach(user => {
users[user.id] = {
id: user.id,
name: user.name,
username: user.username,
description: user.description,
profileImageUrl: user.profile_image_url,
verified: user.verified || false,
followersCount: user.public_metrics?.followers_count || 0,
followingCount: user.public_metrics?.following_count || 0,
createdAt: user.created_at || '',
};
});
return { tweets, users };
} catch (error) {
this.handleTwitterError(error);
throw error; // TypeScript needs this
}
}
/**
* Get user profile information
*/
async getUserProfile(username?: string): Promise<TwitterUser> {
try {
const fields = 'profile_image_url,description,created_at,verified,public_metrics';
// Get current user profile if no username is provided
const user = username
? await this.client.v2.userByUsername(username, { 'user.fields': fields })
: await this.client.v2.me({ 'user.fields': fields });
const userData = user.data;
return {
id: userData.id,
name: userData.name,
username: userData.username,
description: userData.description,
profileImageUrl: userData.profile_image_url,
verified: userData.verified || false,
followersCount: userData.public_metrics?.followers_count || 0,
followingCount: userData.public_metrics?.following_count || 0,
createdAt: userData.created_at || '',
};
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Update user profile
*/
async updateProfile(profileData: {
name?: string;
description?: string;
location?: string;
url?: string;
}): Promise<TwitterUser> {
try {
const result = await this.client.v1.updateAccountProfile(profileData);
return {
id: result.id_str,
name: result.name,
username: result.screen_name,
description: result.description,
profileImageUrl: result.profile_image_url_https,
verified: result.verified,
followersCount: result.followers_count,
followingCount: result.friends_count,
createdAt: result.created_at,
};
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Follow a user
*/
async followUser(username: string): Promise<TwitterUser> {
try {
const result = await this.client.v2.follow(
await this.getUserIdByUsername(username)
);
if (!result.data.following) {
throw new TwitterError('Failed to follow user');
}
return this.getUserProfile(username);
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Unfollow a user
*/
async unfollowUser(username: string): Promise<TwitterUser> {
try {
const result = await this.client.v2.unfollow(
await this.getUserIdByUsername(username)
);
if (!result.data.following) {
return this.getUserProfile(username);
} else {
throw new TwitterError('Failed to unfollow user');
}
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Get followers of a user
*/
async getFollowers(username?: string, count: number = 20): Promise<TwitterUser[]> {
try {
const userId = username
? await this.getUserIdByUsername(username)
: (await this.client.v2.me()).data.id;
const result = await this.client.v2.followers(userId, {
max_results: count,
'user.fields': 'profile_image_url,description,created_at,verified,public_metrics',
});
return result.data.map(user => ({
id: user.id,
name: user.name,
username: user.username,
description: user.description,
profileImageUrl: user.profile_image_url,
verified: user.verified || false,
followersCount: user.public_metrics?.followers_count || 0,
followingCount: user.public_metrics?.following_count || 0,
createdAt: user.created_at || '',
}));
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Get users that a user is following
*/
async getFollowing(username?: string, count: number = 20): Promise<TwitterUser[]> {
try {
const userId = username
? await this.getUserIdByUsername(username)
: (await this.client.v2.me()).data.id;
const result = await this.client.v2.following(userId, {
max_results: count,
'user.fields': 'profile_image_url,description,created_at,verified,public_metrics',
});
return result.data.map(user => ({
id: user.id,
name: user.name,
username: user.username,
description: user.description,
profileImageUrl: user.profile_image_url,
verified: user.verified || false,
followersCount: user.public_metrics?.followers_count || 0,
followingCount: user.public_metrics?.following_count || 0,
createdAt: user.created_at || '',
}));
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Create a Twitter list
*/
async createList(name: string, description?: string, isPrivate: boolean = false): Promise<TwitterList> {
try {
const result = await this.client.v2.createList({
name,
description,
private: isPrivate,
});
return {
id: result.data.id,
name: result.data.name,
description: result.data.description || '',
memberCount: 0,
followerCount: 0,
private: result.data.private || false,
ownerId: await this.getCurrentUserId(),
};
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Get list information
*/
async getListInfo(listId: string): Promise<TwitterList> {
try {
const result = await this.client.v2.list(listId, {
'list.fields': 'follower_count,member_count,owner_id,private',
});
return {
id: result.data.id,
name: result.data.name,
description: result.data.description || '',
memberCount: result.data.member_count || 0,
followerCount: result.data.follower_count || 0,
private: result.data.private || false,
ownerId: result.data.owner_id || '',
};
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Get user lists
*/
async getUserLists(): Promise<TwitterList[]> {
try {
const userId = await this.getCurrentUserId();
const result = await this.client.v2.listsOwned(userId, {
'list.fields': 'follower_count,member_count,owner_id,private',
});
return result.data.map(list => ({
id: list.id,
name: list.name,
description: list.description || '',
memberCount: list.member_count || 0,
followerCount: list.follower_count || 0,
private: list.private || false,
ownerId: list.owner_id || userId,
}));
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Helper: Get user ID by username
*/
private async getUserIdByUsername(username: string): Promise<string> {
try {
const result = await this.client.v2.userByUsername(username);
return result.data.id;
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Helper: Get current user ID
*/
private async getCurrentUserId(): Promise<string> {
try {
const result = await this.client.v2.me();
return result.data.id;
} catch (error) {
this.handleTwitterError(error);
throw error;
}
}
/**
* Error handler
*/
private handleTwitterError(error: any): never {
console.error('Twitter API error:', error);
// Handle rate limiting
if (error.code === 88 || (error.errors && error.errors[0]?.code === 88)) {
throw new TwitterError('Twitter rate limit exceeded', 88, error);
}
// Handle auth errors
if ([32, 89, 135, 215, 226].includes(error.code) ||
(error.errors && [32, 89, 135, 215, 226].includes(error.errors[0]?.code))) {
throw new TwitterError('Twitter authentication error', error.code || 0, error);
}
// For all other errors
const message = error.message ||
(error.errors && error.errors[0]?.message) ||
'Unknown Twitter API error';
const code = error.code ||
(error.errors && error.errors[0]?.code) ||
0;
throw new TwitterError(message, code, error);
}
}
```
--------------------------------------------------------------------------------
/code/account_management.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Sample code for Twitter account management operations
"""
import tweepy
import os
from PIL import Image
import io
def setup_twitter_client():
"""
Setup Twitter API client with authentication
"""
# Replace these placeholder values with your actual Twitter API credentials
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
# Authenticate to Twitter
auth = tweepy.OAuth1UserHandler(
consumer_key, consumer_secret, access_token, access_token_secret
)
# Create API object
api = tweepy.API(auth)
return api
def get_account_info(api):
"""
Get information about the authenticated user's account
Args:
api: Authenticated tweepy API object
Returns:
User object containing account information
"""
try:
me = api.verify_credentials()
print(f"Account information for @{me.screen_name}:")
print(f" - Display name: {me.name}")
print(f" - Bio: {me.description}")
print(f" - Location: {me.location}")
print(f" - Following: {me.friends_count}, Followers: {me.followers_count}")
print(f" - Tweets: {me.statuses_count}")
print(f" - Account created: {me.created_at}")
return me
except Exception as e:
print(f"Error getting account information: {e}")
return None
def update_profile(api, **kwargs):
"""
Update the authenticated user's profile information
Args:
api: Authenticated tweepy API object
kwargs: Optional fields to update (name, description, location, url)
Returns:
Updated user object if successful, None otherwise
"""
try:
# Only pass parameters that are provided
update_params = {}
if 'name' in kwargs:
update_params['name'] = kwargs['name']
if 'description' in kwargs:
update_params['description'] = kwargs['description']
if 'location' in kwargs:
update_params['location'] = kwargs['location']
if 'url' in kwargs:
update_params['url'] = kwargs['url']
# Update profile
updated_user = api.update_profile(**update_params)
print(f"Profile updated successfully for @{updated_user.screen_name}")
return updated_user
except Exception as e:
print(f"Error updating profile: {e}")
return None
def update_profile_image(api, image_path):
"""
Update the authenticated user's profile image
Args:
api: Authenticated tweepy API object
image_path: Path to the new profile image file
Returns:
Updated user object if successful, None otherwise
"""
try:
if not os.path.exists(image_path):
print(f"Image file not found: {image_path}")
return None
# Check image size and format
with Image.open(image_path) as img:
width, height = img.size
print(f"Image dimensions: {width}x{height}")
# Twitter recommends 400x400 pixels for profile images
if width < 400 or height < 400:
print("Warning: Twitter recommends profile images of at least 400x400 pixels")
# Update profile image
updated_user = api.update_profile_image(filename=image_path)
print(f"Profile image updated successfully for @{updated_user.screen_name}")
return updated_user
except Exception as e:
print(f"Error updating profile image: {e}")
return None
def update_profile_banner(api, banner_path):
"""
Update the authenticated user's profile banner
Args:
api: Authenticated tweepy API object
banner_path: Path to the new banner image file
Returns:
True if successful, False otherwise
"""
try:
if not os.path.exists(banner_path):
print(f"Banner file not found: {banner_path}")
return False
# Check image size and format
with Image.open(banner_path) as img:
width, height = img.size
print(f"Banner dimensions: {width}x{height}")
# Twitter recommends 1500x500 pixels for banners
if width < 1500 or height < 500:
print("Warning: Twitter recommends banner images of 1500x500 pixels")
# Update profile banner
api.update_profile_banner(filename=banner_path)
print("Profile banner updated successfully")
return True
except Exception as e:
print(f"Error updating profile banner: {e}")
return False
def get_followers(api, count=20):
"""
Get a list of users following the authenticated user
Args:
api: Authenticated tweepy API object
count: Number of followers to retrieve (default: 20)
Returns:
List of follower user objects
"""
try:
followers = api.get_followers(count=count)
print(f"Retrieved {len(followers)} followers:")
for i, follower in enumerate(followers, 1):
print(f" {i}. @{follower.screen_name} - {follower.name}")
return followers
except Exception as e:
print(f"Error retrieving followers: {e}")
return []
def get_following(api, count=20):
"""
Get a list of users that the authenticated user is following
Args:
api: Authenticated tweepy API object
count: Number of following users to retrieve (default: 20)
Returns:
List of following user objects
"""
try:
following = api.get_friends(count=count)
print(f"Retrieved {len(following)} accounts you are following:")
for i, friend in enumerate(following, 1):
print(f" {i}. @{friend.screen_name} - {friend.name}")
return following
except Exception as e:
print(f"Error retrieving following accounts: {e}")
return []
def follow_user(api, username):
"""
Follow a specified user
Args:
api: Authenticated tweepy API object
username: Screen name of the user to follow
Returns:
Followed user object if successful, None otherwise
"""
try:
user = api.create_friendship(screen_name=username)
print(f"Successfully followed @{user.screen_name}")
return user
except Exception as e:
print(f"Error following user @{username}: {e}")
return None
def unfollow_user(api, username):
"""
Unfollow a specified user
Args:
api: Authenticated tweepy API object
username: Screen name of the user to unfollow
Returns:
Unfollowed user object if successful, None otherwise
"""
try:
user = api.destroy_friendship(screen_name=username)
print(f"Successfully unfollowed @{user.screen_name}")
return user
except Exception as e:
print(f"Error unfollowing user @{username}: {e}")
return None
def create_list(api, name, description, private=False):
"""
Create a new Twitter list
Args:
api: Authenticated tweepy API object
name: Name of the list
description: Description of the list
private: Whether the list should be private (default: False)
Returns:
Created list object if successful, None otherwise
"""
try:
new_list = api.create_list(name=name, description=description, mode='private' if private else 'public')
print(f"List '{new_list.name}' created successfully")
return new_list
except Exception as e:
print(f"Error creating list: {e}")
return None
def get_lists(api):
"""
Get all lists owned by the authenticated user
Args:
api: Authenticated tweepy API object
Returns:
List of owned lists
"""
try:
owned_lists = api.get_lists()
print(f"Retrieved {len(owned_lists)} lists:")
for i, lst in enumerate(owned_lists, 1):
print(f" {i}. {lst.name} - {lst.description} ({lst.member_count} members)")
return owned_lists
except Exception as e:
print(f"Error retrieving lists: {e}")
return []
def main():
# Setup Twitter client
api = setup_twitter_client()
# Example 1: Get account information
print("\n=== Account Information ===")
account_info = get_account_info(api)
# Example 2: Update profile information
print("\n=== Update Profile Information ===")
print("Note: Commented out to prevent actual updates")
# update_profile(
# api,
# name="Updated Name",
# description="This is an updated bio using Python Tweepy!",
# location="San Francisco, CA",
# url="https://example.com"
# )
# Example 3: Update profile image
print("\n=== Update Profile Image ===")
print("Note: Commented out to prevent actual updates")
# profile_image_path = "path/to/profile/image.jpg" # Replace with actual path
# update_profile_image(api, profile_image_path)
# Example 4: Update profile banner
print("\n=== Update Profile Banner ===")
print("Note: Commented out to prevent actual updates")
# banner_image_path = "path/to/banner/image.jpg" # Replace with actual path
# update_profile_banner(api, banner_image_path)
# Example 5: Get followers
print("\n=== Get Followers ===")
followers = get_followers(api, count=5) # Limit to 5 for example
# Example 6: Get accounts you're following
print("\n=== Get Following ===")
following = get_following(api, count=5) # Limit to 5 for example
# Example 7: Follow a user
print("\n=== Follow User ===")
print("Note: Commented out to prevent actual follow")
# follow_user(api, "twitter")
# Example 8: Unfollow a user
print("\n=== Unfollow User ===")
print("Note: Commented out to prevent actual unfollow")
# unfollow_user(api, "twitter")
# Example 9: Create a Twitter list
print("\n=== Create Twitter List ===")
print("Note: Commented out to prevent actual list creation")
# create_list(api, "Python Developers", "A list of Python developers and organizations", private=False)
# Example 10: Get all owned lists
print("\n=== Get Owned Lists ===")
owned_lists = get_lists(api)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/index.ts:
--------------------------------------------------------------------------------
```typescript
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
ListToolsRequestSchema,
CallToolRequestSchema,
Tool,
ErrorCode,
McpError,
TextContent
} from '@modelcontextprotocol/sdk/types.js';
import { TwitterClient } from './twitter-api.js';
import { ResponseFormatter } from './formatter.js';
import {
Config, ConfigSchema,
PostTweetSchema, SearchTweetsSchema,
GetProfileSchema, UpdateProfileSchema,
FollowUserSchema, UnfollowUserSchema,
ListFollowersSchema, ListFollowingSchema,
CreateListSchema, ListInfoSchema,
TwitterError
} from './types.js';
import dotenv from 'dotenv';
export class TwitterServer {
private server: Server;
private client: TwitterClient;
constructor(config: Config) {
// Validate config
const result = ConfigSchema.safeParse(config);
if (!result.success) {
throw new Error(`Invalid configuration: ${result.error.message}`);
}
this.client = new TwitterClient(config);
this.server = new Server({
name: 'twitter-mcp',
version: '1.0.0'
}, {
capabilities: {
tools: {}
}
});
this.setupHandlers();
}
private setupHandlers(): void {
// Error handler
this.server.onerror = (error) => {
console.error('[MCP Error]:', error);
};
// Graceful shutdown
process.on('SIGINT', async () => {
console.error('Shutting down server...');
await this.server.close();
process.exit(0);
});
// Register tool handlers
this.setupToolHandlers();
}
private setupToolHandlers(): void {
// List available tools
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
// Tweet operations
{
name: 'post_tweet',
description: 'Post a new tweet to Twitter',
inputSchema: {
type: 'object',
properties: {
text: {
type: 'string',
description: 'The content of your tweet',
maxLength: 280
}
},
required: ['text']
}
} as Tool,
{
name: 'search_tweets',
description: 'Search for tweets on Twitter',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'Search query'
},
count: {
type: 'number',
description: 'Number of tweets to return (10-100)',
minimum: 10,
maximum: 100
}
},
required: ['query', 'count']
}
} as Tool,
// Account management operations
{
name: 'get_profile',
description: 'Get Twitter profile information for a user or the authenticated account',
inputSchema: {
type: 'object',
properties: {
username: {
type: 'string',
description: 'Twitter username (if not provided, returns authenticated user profile)'
}
},
required: []
}
} as Tool,
{
name: 'update_profile',
description: 'Update the authenticated user\'s Twitter profile',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Display name (max 50 chars)'
},
description: {
type: 'string',
description: 'Bio (max 160 chars)'
},
location: {
type: 'string',
description: 'Location (max 30 chars)'
},
url: {
type: 'string',
description: 'Website URL (max 100 chars)'
}
},
required: []
}
} as Tool,
{
name: 'follow_user',
description: 'Follow a Twitter user',
inputSchema: {
type: 'object',
properties: {
username: {
type: 'string',
description: 'Twitter username to follow'
}
},
required: ['username']
}
} as Tool,
{
name: 'unfollow_user',
description: 'Unfollow a Twitter user',
inputSchema: {
type: 'object',
properties: {
username: {
type: 'string',
description: 'Twitter username to unfollow'
}
},
required: ['username']
}
} as Tool,
{
name: 'list_followers',
description: 'List followers of a Twitter user or the authenticated account',
inputSchema: {
type: 'object',
properties: {
username: {
type: 'string',
description: 'Twitter username (if not provided, returns authenticated user\'s followers)'
},
count: {
type: 'number',
description: 'Number of followers to return (1-200)',
minimum: 1,
maximum: 200,
default: 20
}
},
required: []
}
} as Tool,
{
name: 'list_following',
description: 'List accounts that a Twitter user or the authenticated account is following',
inputSchema: {
type: 'object',
properties: {
username: {
type: 'string',
description: 'Twitter username (if not provided, returns authenticated user\'s following)'
},
count: {
type: 'number',
description: 'Number of accounts to return (1-200)',
minimum: 1,
maximum: 200,
default: 20
}
},
required: []
}
} as Tool,
{
name: 'create_list',
description: 'Create a new Twitter list',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'List name (max 25 chars)'
},
description: {
type: 'string',
description: 'List description (max 100 chars)'
},
private: {
type: 'boolean',
description: 'Whether the list should be private (default: false)'
}
},
required: ['name']
}
} as Tool,
{
name: 'get_list_info',
description: 'Get information about a Twitter list',
inputSchema: {
type: 'object',
properties: {
listId: {
type: 'string',
description: 'Twitter list ID'
}
},
required: ['listId']
}
} as Tool,
{
name: 'get_user_lists',
description: 'Get all lists owned by the authenticated user',
inputSchema: {
type: 'object',
properties: {},
required: []
}
} as Tool
]
}));
// Handle tool execution
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
console.error(`Tool called: ${name}`, args);
try {
switch (name) {
// Tweet operations
case 'post_tweet':
return await this.handlePostTweet(args);
case 'search_tweets':
return await this.handleSearchTweets(args);
// Account management operations
case 'get_profile':
return await this.handleGetProfile(args);
case 'update_profile':
return await this.handleUpdateProfile(args);
case 'follow_user':
return await this.handleFollowUser(args);
case 'unfollow_user':
return await this.handleUnfollowUser(args);
case 'list_followers':
return await this.handleListFollowers(args);
case 'list_following':
return await this.handleListFollowing(args);
case 'create_list':
return await this.handleCreateList(args);
case 'get_list_info':
return await this.handleGetListInfo(args);
case 'get_user_lists':
return await this.handleGetUserLists(args);
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${name}`
);
}
} catch (error) {
return this.handleError(error);
}
});
}
// Tweet operations handlers
private async handlePostTweet(args: unknown) {
const result = PostTweetSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const tweet = await this.client.postTweet(result.data.text);
return {
content: [{
type: 'text',
text: `Tweet posted successfully!\nURL: https://twitter.com/status/${tweet.id}`
}] as TextContent[]
};
}
private async handleSearchTweets(args: unknown) {
const result = SearchTweetsSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const { tweets, users } = await this.client.searchTweets(
result.data.query,
result.data.count
);
const formattedResponse = ResponseFormatter.formatSearchResponse(
result.data.query,
tweets,
users
);
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
// Account management operations handlers
private async handleGetProfile(args: unknown) {
const result = GetProfileSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const profile = await this.client.getUserProfile(result.data.username);
const formattedResponse = ResponseFormatter.formatUserProfile(profile);
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleUpdateProfile(args: unknown) {
const result = UpdateProfileSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const updatedProfile = await this.client.updateProfile({
name: result.data.name,
description: result.data.description,
location: result.data.location,
url: result.data.url
});
const formattedResponse = `Profile updated successfully!\n\n${ResponseFormatter.formatUserProfile(updatedProfile)}`;
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleFollowUser(args: unknown) {
const result = FollowUserSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const user = await this.client.followUser(result.data.username);
const formattedResponse = `Successfully followed @${user.username}!\n\n${ResponseFormatter.formatUserProfile(user)}`;
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleUnfollowUser(args: unknown) {
const result = UnfollowUserSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const user = await this.client.unfollowUser(result.data.username);
const formattedResponse = `Successfully unfollowed @${user.username}!\n\n${ResponseFormatter.formatUserProfile(user)}`;
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleListFollowers(args: unknown) {
const result = ListFollowersSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const followers = await this.client.getFollowers(
result.data.username,
result.data.count
);
const formattedResponse = ResponseFormatter.formatUsersList(followers, 'followers');
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleListFollowing(args: unknown) {
const result = ListFollowingSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const following = await this.client.getFollowing(
result.data.username,
result.data.count
);
const formattedResponse = ResponseFormatter.formatUsersList(following, 'following');
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleCreateList(args: unknown) {
const result = CreateListSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const list = await this.client.createList(
result.data.name,
result.data.description,
result.data.private
);
const formattedResponse = `List "${list.name}" created successfully!\n\n${ResponseFormatter.formatListInfo(list)}`;
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleGetListInfo(args: unknown) {
const result = ListInfoSchema.safeParse(args);
if (!result.success) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid parameters: ${result.error.message}`
);
}
const list = await this.client.getListInfo(result.data.listId);
const formattedResponse = ResponseFormatter.formatListInfo(list);
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private async handleGetUserLists(args: unknown) {
// No parameters needed for this endpoint
const lists = await this.client.getUserLists();
const formattedResponse = ResponseFormatter.formatLists(lists);
return {
content: [{
type: 'text',
text: ResponseFormatter.toMcpResponse(formattedResponse)
}] as TextContent[]
};
}
private handleError(error: unknown) {
if (error instanceof McpError) {
throw error;
}
if (error instanceof TwitterError) {
if (TwitterError.isRateLimit(error)) {
return {
content: [{
type: 'text',
text: 'Rate limit exceeded. Please wait a moment before trying again.',
isError: true
}] as TextContent[]
};
}
return {
content: [{
type: 'text',
text: `Twitter API error: ${(error as TwitterError).message}`,
isError: true
}] as TextContent[]
};
}
console.error('Unexpected error:', error);
throw new McpError(
ErrorCode.InternalError,
'An unexpected error occurred'
);
}
async start(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('Twitter MCP server running on stdio');
}
}
// Start the server
dotenv.config();
const config = {
apiKey: process.env.TWITTER_API_KEY!,
apiSecretKey: process.env.TWITTER_API_SECRET!,
accessToken: process.env.TWITTER_ACCESS_TOKEN!,
accessTokenSecret: process.env.TWITTER_ACCESS_TOKEN_SECRET!
};
const server = new TwitterServer(config);
server.start().catch(error => {
console.error('Failed to start server:', error);
process.exit(1);
});
```