# Directory Structure ``` ├── .gitignore ├── .python-version ├── Dockerfile ├── gmail_api.py ├── gmail_mcp.log ├── gmail_server.py ├── gmail_token_creator.py ├── google_apis.py ├── LICENSE ├── pyproject.toml ├── read_emails.py ├── README.md ├── search_emails.py ├── send_emails.py ├── smithery.yaml └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.12 2 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python virtual environments 2 | venv/ 3 | .env 4 | 5 | # Token files and secrets 6 | client_secret.json 7 | token_files/ 8 | *.log 9 | __pycache__/ 10 | *.pyc 11 | 12 | # Mac/Windows files 13 | .DS_Store 14 | 15 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Gmail MCP Server 2 | [](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server) 3 | 4 | A powerful and flexible Gmail integration server built using the MCP (Message Control Protocol) framework. This server provides a robust interface to interact with Gmail APIs, offering functionality for reading, sending, and managing emails programmatically. 5 | 6 | ## Features 7 | 8 | - Read emails from multiple Gmail accounts 9 | - Send emails with attachments 10 | - Search emails with advanced query options 11 | - Download email attachments 12 | - Handle email conversations and threads 13 | - Real-time email monitoring 14 | - Support for multiple Gmail accounts 15 | 16 | ## Prerequisites 17 | 18 | Before running the Gmail MCP server, ensure you have the following: 19 | 20 | 1. Python 3.12 or higher 21 | 2. Google Cloud Project with Gmail API enabled 22 | 3. OAuth 2.0 Client ID credentials 23 | 4. Required Python packages (specified in pyproject.toml) 24 | 25 | ## Installation 26 | 27 | ### Installing via Smithery 28 | 29 | To install Gmail Integration Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server): 30 | 31 | ```bash 32 | npx -y @smithery/cli install @Quantum-369/Gmail-mcp-server --client claude 33 | ``` 34 | 35 | 1. Clone the repository: 36 | ```bash 37 | git clone <your-repository-url> 38 | cd gmail-mcp-server 39 | ``` 40 | 41 | 2. Create and activate a virtual environment: 42 | ```bash 43 | python -m venv venv 44 | # On Windows 45 | venv\Scripts\activate 46 | # On Unix/MacOS 47 | source venv/bin/activate 48 | ``` 49 | 50 | 3. Install dependencies: 51 | ```bash 52 | pip install . 53 | ``` 54 | 55 | ## Running on Ubuntu 56 | 57 | 1. Install Python 3.12 and create a virtual environment if you haven't already: 58 | ```bash 59 | sudo apt install python3.12 python3.12-venv -y 60 | python3.12 -m venv venv 61 | source venv/bin/activate 62 | ``` 63 | 2. Place your downloaded `client_secret.json` in the project root. 64 | 3. Generate an OAuth token for the Gmail account you want to use. Copy the URL 65 | printed by the script into a web browser and complete the sign-in process: 66 | ```bash 67 | python gmail_token_creator.py 68 | ``` 69 | 4. Start the server: 70 | ```bash 71 | python gmail_server.py 72 | ``` 73 | 74 | ## Setup Google Cloud Project 75 | 76 | 1. Go to the [Google Cloud Console](https://console.cloud.google.com/) 77 | 2. Create a new project or select an existing one 78 | 3. Enable the Gmail API for your project 79 | 4. Create OAuth 2.0 credentials: 80 | - Go to "APIs & Services" > "Credentials" 81 | - Click "Create Credentials" > "OAuth client ID" 82 | - Choose "Desktop app" as application type 83 | - Download the client configuration file 84 | 5. Rename the downloaded file to `client_secret.json` and place it in the project root directory 85 | 86 | ## Configuration 87 | 88 | 1. Set up email identifiers in `gmail_token_creator.py`: 89 | ```python 90 | email_identifier = '[email protected]' # Change this for each account 91 | ``` 92 | 93 | 2. Run the token creator to authenticate your Gmail accounts: 94 | ```bash 95 | python gmail_token_creator.py 96 | ``` 97 | The script prints an authorization URL. Copy this URL into your web browser, 98 | complete the Google consent flow, and copy the verification code back into the 99 | terminal if prompted. A token file will be created inside `token_files/` for 100 | future use. 101 | 102 | 3. Repeat the process for each Gmail account you want to integrate 103 | 104 | ## Server Structure 105 | 106 | - `gmail_server.py`: Main MCP server implementation 107 | - `gmail_api.py`: Gmail API interaction functions 108 | - `google_apis.py`: Google API authentication utilities 109 | - Supporting files: 110 | - `read_emails.py`: Email reading functionality 111 | - `search_emails.py`: Email search functionality 112 | - `send_emails.py`: Email sending functionality 113 | 114 | ## Usage 115 | 116 | ### Starting the Server 117 | 118 | ```bash 119 | python gmail_server.py 120 | ``` 121 | 122 | ### Running with Docker 123 | 124 | You can also build a container image using the provided `Dockerfile`: 125 | 126 | ```bash 127 | docker build -t gmail-mcp-server . 128 | docker run -v $(pwd)/client_secret.json:/app/client_secret.json \ 129 | -v $(pwd)/token_files:/app/token_files gmail-mcp-server 130 | ``` 131 | 132 | The container runs the same server and stores authentication tokens in the 133 | `token_files` directory on the host so they persist between runs. 134 | 135 | ### Available Tools 136 | 137 | 1. Send Email: 138 | ```python 139 | await send_gmail( 140 | email_identifier="[email protected]", 141 | to="[email protected]", 142 | subject="Test Subject", 143 | body="Email body content", 144 | attachment_paths=["path/to/attachment"] 145 | ) 146 | ``` 147 | 148 | 2. Search Emails: 149 | ```python 150 | await search_email_tool( 151 | email_identifier="[email protected]", 152 | query="from:[email protected]", 153 | max_results=30, 154 | include_conversations=True 155 | ) 156 | ``` 157 | 158 | 3. Read Latest Emails: 159 | ```python 160 | await read_latest_emails( 161 | email_identifier="[email protected]", 162 | max_results=5, 163 | download_attachments=False 164 | ) 165 | ``` 166 | 167 | 4. Download Attachments: 168 | ```python 169 | await download_email_attachments( 170 | email_identifier="[email protected]", 171 | msg_id="message_id", 172 | download_all_in_thread=False 173 | ) 174 | ``` 175 | 176 | ## Security Considerations 177 | 178 | - Store `client_secret.json` securely and never commit it to version control 179 | - Keep token files secure and add them to `.gitignore` 180 | - Use environment variables for sensitive information 181 | - Regularly rotate OAuth credentials 182 | - Monitor API usage and set appropriate quotas 183 | 184 | ## Error Handling 185 | 186 | The server includes comprehensive error handling and logging: 187 | - Logs are written to `gmail_mcp.log` 188 | - Both file and console logging are enabled 189 | - Detailed error messages for debugging 190 | 191 | ## Contributing 192 | 193 | 1. Fork the repository 194 | 2. Create a feature branch 195 | 3. Commit your changes 196 | 4. Push to the branch 197 | 5. Create a Pull Request 198 | 199 | ## License 200 | 201 | Apachelicense2.0 202 | 203 | ## Support 204 | 205 | For issues and feature requests, please use the GitHub issue tracker. 206 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "gmail-server" 3 | version = "0.1.0" 4 | description = "Add your description here" 5 | readme = "README.md" 6 | requires-python = ">=3.12" 7 | dependencies = [ 8 | "google-api-python-client>=2.160.0", 9 | "google-auth>=2.38.0", 10 | "google-auth-oauthlib>=1.2.1", 11 | "mcp[cli]>=1.2.1", 12 | ] 13 | ``` -------------------------------------------------------------------------------- /gmail_token_creator.py: -------------------------------------------------------------------------------- ```python 1 | 2 | # gmail_api.py 3 | from google_apis import create_service 4 | 5 | client_secret_file = 'client_secret.json' 6 | API_SERVICE_NAME = 'gmail' 7 | API_VERSION = 'v1' 8 | SCOPES = ['https://mail.google.com/'] 9 | 10 | # Add an identifier for each account, like the email address 11 | email_identifier = '[email protected]' # Change this for each account 12 | #[email protected] 13 | #[email protected] 14 | #[email protected] 15 | #[email protected] 16 | #[email protected] 17 | 18 | # Pass the email identifier as a prefix to create unique token files 19 | service = create_service(client_secret_file, API_SERVICE_NAME, API_VERSION, SCOPES, prefix=f'_{email_identifier}') 20 | #print(dir(service)) 21 | ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml 1 | # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml 2 | 3 | startCommand: 4 | type: stdio 5 | configSchema: 6 | # JSON Schema defining the configuration options for the MCP. 7 | type: object 8 | required: 9 | - clientSecretPath 10 | - emailIdentifier 11 | properties: 12 | clientSecretPath: 13 | type: string 14 | description: Path to the OAuth 2.0 client secret JSON file. 15 | emailIdentifier: 16 | type: string 17 | description: Email identifier for the Gmail account. 18 | commandFunction: 19 | # A function that produces the CLI command to start the MCP on stdio. 20 | |- 21 | (config) => ({ command: 'python', args: ['gmail_server.py'], env: { CLIENT_SECRET_PATH: config.clientSecretPath, EMAIL_IDENTIFIER: config.emailIdentifier } }) 22 | ``` -------------------------------------------------------------------------------- /send_emails.py: -------------------------------------------------------------------------------- ```python 1 | from pathlib import Path 2 | from gmail_api import init_gmail_service, send_email 3 | 4 | # Configuration 5 | client_file = 'client_secret.json' 6 | email_identifier = '[email protected]' # Change this for each account 7 | 8 | # Initialize Gmail API service for the specific email account 9 | service = init_gmail_service(client_file, prefix=f'_{email_identifier}') 10 | 11 | # Email details 12 | to_address = '[email protected]' 13 | email_subject = 'MCP servers document' 14 | email_body = 'This is a test email sent using the Gmail API.' 15 | 16 | # Attachments 17 | attachment_dir = Path('C:\\Users\\harsh\\Downloads\\MS projects\\MCP\\mcpdocs') 18 | attachment_files = list(attachment_dir.glob('*')) # Load all files from the attachments folder 19 | 20 | # Send the email 21 | response_email_sent = send_email( 22 | service=service, 23 | to=to_address, 24 | subject=email_subject, 25 | body=email_body, 26 | body_type='plain', 27 | attachment_paths=attachment_files 28 | ) 29 | 30 | # Output response 31 | print(response_email_sent) 32 | 33 | ``` -------------------------------------------------------------------------------- /search_emails.py: -------------------------------------------------------------------------------- ```python 1 | from gmail_api import init_gmail_service, get_email_message_details, search_emails, search_email_conversations 2 | 3 | client_file = 'client_secret.json' 4 | email_identifier = '[email protected]' # Specify the email identifier 5 | 6 | # Initialize Gmail API service 7 | service = init_gmail_service(client_file, prefix=f'_{email_identifier}') 8 | 9 | query = 'from:me' 10 | email_messages = search_emails(service, query, max_results=30) 11 | email_messages += search_email_conversations(service, query, max_results=30) # Combine both 12 | 13 | for message in email_messages: 14 | email_detail = get_email_message_details(service, message['id']) 15 | 16 | if email_detail: # Check if email details were fetched successfully 17 | print(message['id']) 18 | print(f"Subject: {email_detail.get('subject', 'No Subject')}") 19 | print(f"Date: {email_detail.get('date', 'No Date')}") 20 | print(f"Label: {email_detail.get('label', 'No Label')}") 21 | print('Snippet: ', email_detail.get('snippet', 'No Snippet')) 22 | print(f"Body: {email_detail.get('body', 'No Body')}") 23 | print('-' * 50) 24 | else: 25 | print(f"Skipping message ID {message['id']} as it couldn't be retrieved.") 26 | 27 | ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile 2 | # Use a Python image with uv pre-installed 3 | FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv 4 | 5 | # Install the project into /app 6 | WORKDIR /app 7 | 8 | # Enable bytecode compilation 9 | ENV UV_COMPILE_BYTECODE=1 10 | 11 | # Copy from the cache instead of linking since it's a mounted volume 12 | ENV UV_LINK_MODE=copy 13 | 14 | # Install the project's dependencies using the lockfile and settings 15 | RUN --mount=type=cache,target=/root/.cache/uv --mount=type=bind,source=uv.lock,target=uv.lock --mount=type=bind,source=pyproject.toml,target=pyproject.toml uv sync --frozen --no-install-project --no-dev --no-editable 16 | 17 | # Then, add the rest of the project source code and install it 18 | # Installing separately from its dependencies allows optimal layer caching 19 | ADD . /app 20 | RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-dev --no-editable 21 | 22 | FROM python:3.12-slim-bookworm 23 | 24 | WORKDIR /app 25 | 26 | COPY --from=uv /root/.local /root/.local 27 | COPY --from=uv --chown=app:app /app/.venv /app/.venv 28 | 29 | # Place executables in the environment at the front of the path 30 | ENV PATH="/app/.venv/bin:$PATH" 31 | 32 | # when running the container, run the main server script 33 | ENTRYPOINT ["python", "gmail_server.py"] 34 | ``` -------------------------------------------------------------------------------- /read_emails.py: -------------------------------------------------------------------------------- ```python 1 | from pathlib import Path 2 | from gmail_api import init_gmail_service, get_email_messages, get_email_message_details, download_attachments_parent 3 | 4 | client_file = 'client_secret.json' 5 | email_identifier = '[email protected]' # Specify the email identifier 6 | 7 | # Initialize Gmail API service 8 | service = init_gmail_service(client_file, prefix=f'_{email_identifier}') 9 | 10 | # Correctly unpack the messages and next_page_token 11 | messages, _ = get_email_messages(service, max_results=5) 12 | 13 | # Target directory to save attachments 14 | attachment_dir = Path('./downloaded_attachments') # Folder to save attachments 15 | attachment_dir.mkdir(exist_ok=True) # Create the folder if it doesn't exist 16 | 17 | # Process Emails 18 | for msg in messages: 19 | details = get_email_message_details(service, msg['id']) 20 | if details: 21 | print(f"Subject: {details['subject']}") 22 | print(f"From: {details['sender']}") 23 | print(f"Recipients: {details['recipients']}") 24 | print(f"Body: {details['body'][:100]}...") # Print first 100 characters of the body 25 | print(f"Snippet: {details['snippet']}") 26 | print(f"Has Attachments: {details['has_attachments']}") 27 | print(f"Date: {details['date']}") 28 | print(f"Star: {details['star']}") 29 | print(f"Label: {details['label']}") 30 | print("-" * 50) 31 | 32 | # Download Attachments if present 33 | if details['has_attachments']: 34 | download_attachments_parent(service, user_id='me', msg_id=msg['id'], target_dir=str(attachment_dir)) 35 | ``` -------------------------------------------------------------------------------- /google_apis.py: -------------------------------------------------------------------------------- ```python 1 | # google_apis.py 2 | import os 3 | from google_auth_oauthlib.flow import InstalledAppFlow 4 | from googleapiclient.discovery import build 5 | from google.oauth2.credentials import Credentials 6 | from google.auth.transport.requests import Request 7 | 8 | def create_service(client_secret_file, api_name, api_version, *scopes, prefix=''): 9 | CLIENT_SECRET_FILE = client_secret_file 10 | API_SERVICE_NAME = api_name 11 | API_VERSION = api_version 12 | 13 | SCOPES = [scope for scope in scopes[0]] 14 | 15 | creds = None 16 | working_dir = os.getcwd() 17 | token_dir = 'token_files' 18 | 19 | # Include the prefix (email identifier) in the token file name 20 | token_file = f'token_{API_SERVICE_NAME}_{API_VERSION}{prefix}.json' 21 | 22 | # Check if the token directory exists, create if not 23 | if not os.path.exists(os.path.join(working_dir, token_dir)): 24 | os.mkdir(os.path.join(working_dir, token_dir)) 25 | 26 | # Load existing credentials if available 27 | if os.path.exists(os.path.join(working_dir, token_dir, token_file)): 28 | creds = Credentials.from_authorized_user_file( 29 | os.path.join(working_dir, token_dir, token_file), SCOPES 30 | ) 31 | 32 | # If no valid credentials, initiate the authentication flow 33 | if not creds or not creds.valid: 34 | if creds and creds.expired and creds.refresh_token: 35 | creds.refresh(Request()) 36 | else: 37 | flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) 38 | creds = flow.run_local_server(port=0) 39 | 40 | # Save the credentials for future use 41 | with open(os.path.join(working_dir, token_dir, token_file), 'w') as token: 42 | token.write(creds.to_json()) 43 | 44 | try: 45 | service = build(API_SERVICE_NAME, API_VERSION, credentials=creds, static_discovery=False) 46 | print(f'{API_SERVICE_NAME} {API_VERSION} service created successfully for {prefix}') 47 | return service 48 | except Exception as e: 49 | print(e) 50 | print(f'Failed to create service instance for {API_SERVICE_NAME}') 51 | # Remove corrupted token file if exists 52 | if os.path.exists(os.path.join(working_dir, token_dir, token_file)): 53 | os.remove(os.path.join(working_dir, token_dir, token_file)) 54 | return None 55 | ``` -------------------------------------------------------------------------------- /gmail_api.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import base64 3 | from email.mime.text import MIMEText 4 | from email.mime.multipart import MIMEMultipart 5 | from email.mime.base import MIMEBase 6 | from email import encoders 7 | from pathlib import Path 8 | from google_apis import create_service 9 | 10 | def init_gmail_service(client_file, api_name='gmail', api_version='v1', scopes=['https://mail.google.com/'], prefix=''): 11 | return create_service(client_file, api_name, api_version, scopes, prefix=prefix) 12 | 13 | 14 | def _extract_body(payload): 15 | body = '<text body not available>' 16 | if 'parts' in payload: 17 | for part in payload['parts']: 18 | if part['mimeType'] == 'multipart/alternative': 19 | for subpart in part['parts']: 20 | if subpart['mimeType'] == 'text/plain' and 'data' in subpart['body']: 21 | body = base64.urlsafe_b64decode(subpart['body']['data']).decode('utf-8') 22 | break 23 | elif part['body']['data']: 24 | body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') 25 | break 26 | return body 27 | 28 | def get_email_messages(service, user_id='me', label_ids=None, folder_name='INBOX', max_results=5): 29 | messages = [] 30 | next_page_token = None 31 | 32 | if folder_name: 33 | label_results = service.users().labels().list(userId=user_id).execute() 34 | labels = label_results.get('labels', []) 35 | folder_label_id = next((label['id'] for label in labels if label['name'].lower() == folder_name.lower()), None) 36 | 37 | if folder_label_id: 38 | message_response = service.users().messages().list(userId=user_id, labelIds=[folder_label_id], maxResults=max_results).execute() 39 | messages.extend(message_response.get('messages', [])) 40 | next_page_token = message_response.get('nextPageToken', None) 41 | 42 | return messages, next_page_token 43 | 44 | def get_email_message_details(service, msg_id): 45 | try: 46 | message = service.users().messages().get(userId='me', id=msg_id).execute() 47 | payload = message['payload'] 48 | headers = payload.get('headers', []) 49 | 50 | subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No subject') 51 | 52 | sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'No sender') 53 | 54 | recipients = next((header['value'] for header in headers if header['name'].lower() == 'to'), 'No recipients') 55 | 56 | snippet = message.get('snippet', 'No snippet') 57 | 58 | has_attachments = any(part.get('filename') for part in payload.get('parts', []) if part.get('filename')) 59 | 60 | date = next((header['value'] for header in headers if header['name'].lower() == 'date'), 'No date') 61 | 62 | star = message.get('labelIds', []).count('STARRED') > 0 63 | 64 | label = ', '.join(message.get('labelIds', [])) 65 | 66 | body = _extract_body(payload) 67 | 68 | return { 69 | 'subject': subject, 70 | 'sender': sender, 71 | 'recipients': recipients, 72 | 'body': body, 73 | 'snippet': snippet, 74 | 'has_attachments': has_attachments, 75 | 'date': date, 76 | 'star': star, 77 | 'label': label 78 | } 79 | except Exception as e: 80 | print(f'Error getting email message details: {e}') 81 | return None 82 | def send_email(service, to, subject, body, body_type='plain', attachment_paths=None): 83 | from email.mime.text import MIMEText 84 | from email.mime.multipart import MIMEMultipart 85 | from email.mime.base import MIMEBase 86 | from email import encoders 87 | import base64 88 | import os 89 | from pathlib import Path 90 | import mimetypes 91 | 92 | message = MIMEMultipart() 93 | message['to'] = to 94 | message['subject'] = subject 95 | 96 | message.attach(MIMEText(body, body_type)) 97 | 98 | # Handle attachments 99 | if attachment_paths: 100 | for attachment_path in attachment_paths: 101 | attachment_path = Path(attachment_path) # Ensure it's a Path object 102 | if not attachment_path.exists(): 103 | raise FileNotFoundError(f"File not found - {attachment_path}") 104 | 105 | # Guess the MIME type based on file extension 106 | content_type, encoding = mimetypes.guess_type(attachment_path) 107 | if content_type is None or encoding is not None: 108 | content_type = 'application/octet-stream' 109 | 110 | main_type, sub_type = content_type.split('/', 1) 111 | 112 | with open(attachment_path, 'rb') as attachment_file: 113 | part = MIMEBase(main_type, sub_type) 114 | part.set_payload(attachment_file.read()) 115 | encoders.encode_base64(part) 116 | part.add_header( 117 | 'Content-Disposition', 118 | f'attachment; filename="{attachment_path.name}"' 119 | ) 120 | message.attach(part) 121 | 122 | # Encode message and send 123 | raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode() 124 | send_message = {'raw': raw_message} 125 | 126 | try: 127 | sent_message = service.users().messages().send(userId='me', body=send_message).execute() 128 | print(f"Email sent successfully to {to} with attachments.") 129 | return sent_message 130 | except Exception as e: 131 | print(f"An error occurred: {e}") 132 | return None 133 | def download_attachments_parent(service, user_id, msg_id, target_dir): 134 | message = service.users().messages().get(userId=user_id, id=msg_id).execute() 135 | for part in message['payload']['parts']: 136 | if part['filename']: 137 | att_id = part['body']['attachmentId'] 138 | att = service.users().messages().attachments().get(userId=user_id, messageId=msg_id, id=att_id).execute() 139 | data = att['data'] 140 | file_data = base64.urlsafe_b64decode(data.encode('UTF-8')) 141 | file_path = os.path.join(target_dir, part['filename']) 142 | print('Saving attachment to:', file_path) 143 | with open(file_path, 'wb') as f: 144 | f.write(file_data) 145 | 146 | def download_attachments_all(service, user_id, msg_id, target_dir): 147 | thread = service.users().threads().get(userId=user_id, id=msg_id).execute() 148 | for message in thread['messages']: 149 | for part in message['payload']['parts']: 150 | if part['filename']: 151 | att_id = part['body']['attachmentId'] 152 | att = service.users().messages().attachments().get(userId=user_id, messageId=message['id'], id=att_id).execute() 153 | data = att['data'] 154 | file_data = base64.urlsafe_b64decode(data.encode('UTF-8')) 155 | file_path = os.path.join(target_dir, part['filename']) 156 | print('Saving attachment to:', file_path) 157 | with open(file_path, 'wb') as f: 158 | f.write(file_data) 159 | def search_emails(service, query, user_id='me', max_results=5): 160 | messages = [] 161 | next_page_token = None 162 | 163 | while True: 164 | result = service.users().messages().list( 165 | userId=user_id, 166 | q=query, 167 | maxResults=min(500, max_results - len(messages)) if max_results else 500, 168 | pageToken=next_page_token 169 | ).execute() 170 | 171 | messages.extend(result.get('messages', [])) 172 | 173 | next_page_token = result.get('nextPageToken') 174 | 175 | if not next_page_token or (max_results and len(messages) >= max_results): 176 | break 177 | 178 | return messages[:max_results] if max_results else messages 179 | 180 | def search_email_conversations(service, query, user_id='me', max_results=5): 181 | conversations = [] 182 | next_page_token = None 183 | 184 | while True: 185 | result = service.users().threads().list( 186 | userId=user_id, 187 | q=query, 188 | maxResults=min(500, max_results - len(conversations)) if max_results else 500, 189 | pageToken=next_page_token 190 | ).execute() 191 | 192 | conversations.extend(result.get('threads', [])) 193 | 194 | next_page_token = result.get('nextPageToken') 195 | 196 | if not next_page_token or (max_results and len(conversations) >= max_results): 197 | break 198 | 199 | return conversations[:max_results] if max_results else conversations 200 | ``` -------------------------------------------------------------------------------- /gmail_server.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import logging 3 | from pathlib import Path 4 | from typing import List, Optional, Dict, Any 5 | from mcp.server.fastmcp import FastMCP, Context 6 | 7 | from gmail_api import ( 8 | init_gmail_service, 9 | get_email_message_details, 10 | search_emails, 11 | search_email_conversations, 12 | send_email, 13 | get_email_messages, 14 | download_attachments_parent, 15 | download_attachments_all 16 | ) 17 | 18 | # Configure logging 19 | logging.basicConfig( 20 | level=logging.INFO, 21 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', 22 | handlers=[ 23 | logging.FileHandler('gmail_mcp.log'), 24 | logging.StreamHandler() 25 | ] 26 | ) 27 | logger = logging.getLogger('gmail_mcp') 28 | 29 | # Initialize MCP Server 30 | mcp = FastMCP( 31 | "Gmail MCP Server", 32 | dependencies=["google-api-python-client", "google-auth-oauthlib"] 33 | ) 34 | 35 | # Gmail Service Initialization 36 | CLIENT_FILE = 'client_secret.json' 37 | 38 | def get_gmail_service(email_identifier: str): 39 | try: 40 | service = init_gmail_service(CLIENT_FILE, prefix=f'_{email_identifier}') 41 | if not service: 42 | raise ValueError(f"Failed to initialize Gmail service for {email_identifier}") 43 | return service 44 | except Exception as e: 45 | logger.error(f"Error initializing Gmail service: {str(e)}") 46 | raise 47 | 48 | # Resources 49 | @mcp.resource("gmail://inbox/{email_identifier}") 50 | async def get_inbox(email_identifier: str) -> Dict[str, Any]: 51 | """Get latest emails from inbox""" 52 | try: 53 | logger.info(f"Fetching inbox for {email_identifier}") 54 | service = get_gmail_service(email_identifier) 55 | messages, next_page = get_email_messages(service, max_results=10) 56 | emails = [] 57 | for msg in messages: 58 | details = get_email_message_details(service, msg['id']) 59 | if details: 60 | emails.append(details) 61 | return { 62 | "success": True, 63 | "emails": emails, 64 | "has_more": bool(next_page) 65 | } 66 | except Exception as e: 67 | logger.error(f"Error fetching inbox: {str(e)}") 68 | return {"success": False, "message": str(e)} 69 | 70 | @mcp.resource("gmail://email/{email_identifier}/{msg_id}") 71 | async def get_email_details(email_identifier: str, msg_id: str) -> Dict[str, Any]: 72 | """Get detailed information about a specific email""" 73 | try: 74 | logger.info(f"Fetching email details for ID {msg_id}") 75 | service = get_gmail_service(email_identifier) 76 | details = get_email_message_details(service, msg_id) 77 | if details: 78 | return {"success": True, "email": details} 79 | return {"success": False, "message": "Email not found"} 80 | except Exception as e: 81 | logger.error(f"Error fetching email details: {str(e)}") 82 | return {"success": False, "message": str(e)} 83 | 84 | @mcp.resource("gmail://attachments/{email_identifier}/{msg_id}") 85 | async def list_attachments(email_identifier: str, msg_id: str) -> Dict[str, Any]: 86 | """List attachments for a specific email""" 87 | try: 88 | logger.info(f"Listing attachments for email {msg_id}") 89 | service = get_gmail_service(email_identifier) 90 | details = get_email_message_details(service, msg_id) 91 | if details and details.get('has_attachments'): 92 | return { 93 | "success": True, 94 | "has_attachments": True, 95 | "message_id": msg_id 96 | } 97 | return { 98 | "success": True, 99 | "has_attachments": False 100 | } 101 | except Exception as e: 102 | logger.error(f"Error listing attachments: {str(e)}") 103 | return {"success": False, "message": str(e)} 104 | 105 | # Tools 106 | @mcp.tool() 107 | async def send_gmail( 108 | email_identifier: str, 109 | to: str, 110 | subject: str, 111 | body: str, 112 | attachment_paths: Optional[List[str]] = None 113 | ) -> Dict[str, Any]: 114 | """Send an email with optional attachments""" 115 | try: 116 | logger.info(f"Sending email to {to} from {email_identifier}") 117 | service = get_gmail_service(email_identifier) 118 | 119 | # Validate attachment paths 120 | if attachment_paths: 121 | for path in attachment_paths: 122 | if not os.path.exists(path): 123 | return { 124 | "success": False, 125 | "message": f"Attachment not found: {path}" 126 | } 127 | 128 | response = send_email( 129 | service=service, 130 | to=to, 131 | subject=subject, 132 | body=body, 133 | body_type='plain', 134 | attachment_paths=attachment_paths 135 | ) 136 | 137 | if response: 138 | return { 139 | "success": True, 140 | "message": f"Email sent successfully to {to}", 141 | "message_id": response.get('id', 'unknown') 142 | } 143 | return { 144 | "success": False, 145 | "message": "Failed to send email" 146 | } 147 | except Exception as e: 148 | logger.error(f"Error sending email: {str(e)}") 149 | return {"success": False, "message": str(e)} 150 | 151 | @mcp.tool() 152 | async def search_email_tool( 153 | email_identifier: str, 154 | query: str = '', 155 | max_results: int = 30, 156 | include_conversations: bool = True 157 | ) -> Dict[str, Any]: 158 | """Search emails with optional conversation inclusion""" 159 | try: 160 | logger.info(f"Searching emails for {email_identifier} with query: {query}") 161 | service = get_gmail_service(email_identifier) 162 | 163 | emails = [] 164 | # Search regular emails 165 | messages = search_emails(service, query, max_results=max_results) 166 | for msg in messages: 167 | details = get_email_message_details(service, msg['id']) 168 | if details: 169 | emails.append(details) 170 | 171 | # Search conversations if requested 172 | if include_conversations: 173 | conversations = search_email_conversations(service, query, max_results=max_results) 174 | for conv in conversations: 175 | details = get_email_message_details(service, conv['id']) 176 | if details: 177 | emails.append(details) 178 | 179 | return { 180 | "success": True, 181 | "message": f"Found {len(emails)} emails", 182 | "emails": emails 183 | } 184 | except Exception as e: 185 | logger.error(f"Error searching emails: {str(e)}") 186 | return {"success": False, "message": str(e), "emails": []} 187 | 188 | @mcp.tool() 189 | async def read_latest_emails( 190 | email_identifier: str, 191 | max_results: int = 5, 192 | download_attachments: bool = False 193 | ) -> Dict[str, Any]: 194 | """Read latest emails with optional attachment download""" 195 | try: 196 | logger.info(f"Reading latest {max_results} emails for {email_identifier}") 197 | service = get_gmail_service(email_identifier) 198 | 199 | messages, _ = get_email_messages(service, max_results=max_results) 200 | emails = [] 201 | 202 | attachment_dir = Path('./downloaded_attachments') 203 | if download_attachments: 204 | attachment_dir.mkdir(exist_ok=True) 205 | 206 | for msg in messages: 207 | details = get_email_message_details(service, msg['id']) 208 | if details: 209 | if download_attachments and details.get('has_attachments'): 210 | download_attachments_parent( 211 | service, 212 | user_id='me', 213 | msg_id=msg['id'], 214 | target_dir=str(attachment_dir) 215 | ) 216 | details['attachments_downloaded'] = True 217 | details['attachment_dir'] = str(attachment_dir) 218 | emails.append(details) 219 | 220 | return { 221 | "success": True, 222 | "message": f"Retrieved {len(emails)} latest emails", 223 | "emails": emails, 224 | "attachment_downloads": download_attachments 225 | } 226 | except Exception as e: 227 | logger.error(f"Error reading latest emails: {str(e)}") 228 | return {"success": False, "message": str(e), "emails": []} 229 | 230 | @mcp.tool() 231 | async def download_email_attachments( 232 | email_identifier: str, 233 | msg_id: str, 234 | download_all_in_thread: bool = False 235 | ) -> Dict[str, Any]: 236 | """Download attachments for a specific email or its entire thread""" 237 | try: 238 | logger.info(f"Downloading attachments for email {msg_id}") 239 | service = get_gmail_service(email_identifier) 240 | 241 | attachment_dir = Path('./downloaded_attachments') 242 | attachment_dir.mkdir(exist_ok=True) 243 | 244 | if download_all_in_thread: 245 | download_attachments_all( 246 | service, 247 | user_id='me', 248 | msg_id=msg_id, 249 | target_dir=str(attachment_dir) 250 | ) 251 | else: 252 | download_attachments_parent( 253 | service, 254 | user_id='me', 255 | msg_id=msg_id, 256 | target_dir=str(attachment_dir) 257 | ) 258 | 259 | return { 260 | "success": True, 261 | "message": "Attachments downloaded successfully", 262 | "directory": str(attachment_dir), 263 | "thread_downloaded": download_all_in_thread 264 | } 265 | except Exception as e: 266 | logger.error(f"Error downloading attachments: {str(e)}") 267 | return {"success": False, "message": str(e)} 268 | 269 | # Prompts 270 | @mcp.prompt() 271 | def compose_email_prompt() -> Dict[str, Any]: 272 | """Guide for composing and sending an email""" 273 | return { 274 | "description": "Guide for composing and sending an email", 275 | "messages": [ 276 | { 277 | "role": "system", 278 | "content": """You're helping the user compose and send an email. Make sure to collect: 279 | 1. Email identifier (the account sending the email) 280 | 2. Recipient's email address 281 | 3. Subject line 282 | 4. Email body content 283 | 5. Any attachments (optional) - provide full file paths""" 284 | }, 285 | { 286 | "role": "user", 287 | "content": "I need to send an email." 288 | }, 289 | { 290 | "role": "assistant", 291 | "content": """I'll help you compose and send an email. Please provide: 292 | 293 | 1. Which email account should send this? (email identifier) 294 | 2. Who are you sending it to? (recipient's email) 295 | 3. What's the subject of your email? 296 | 4. What would you like to say in the email? 297 | 5. Do you need to attach any files? If yes, please provide the file paths.""" 298 | } 299 | ] 300 | } 301 | 302 | @mcp.prompt() 303 | def search_email_prompt() -> Dict[str, Any]: 304 | """Guide for searching emails with various criteria""" 305 | return { 306 | "description": "Guide for searching emails", 307 | "messages": [ 308 | { 309 | "role": "system", 310 | "content": """You're helping the user search through their emails. Collect: 311 | 1. Email identifier (which account to search) 312 | 2. Search criteria (from, to, subject, date range, etc.) 313 | 3. Maximum number of results needed 314 | 4. Whether to include conversation threads""" 315 | }, 316 | { 317 | "role": "user", 318 | "content": "I want to search my emails." 319 | }, 320 | { 321 | "role": "assistant", 322 | "content": """I'll help you search your emails. Please specify: 323 | 324 | 1. Which email account do you want to search? (email identifier) 325 | 2. What are you looking for? You can search by: 326 | - Sender (from:[email protected]) 327 | - Subject (subject:meeting) 328 | - Date range (after:2024/01/01 before:2024/02/01) 329 | - Has attachment (has:attachment) 330 | Or combine these criteria. 331 | 3. How many results would you like to see? (default is 30) 332 | 4. Should I include conversation threads in the search? (yes/no)""" 333 | } 334 | ] 335 | } 336 | 337 | @mcp.prompt() 338 | def read_latest_emails_prompt() -> Dict[str, Any]: 339 | """Guide for reading recent emails with optional attachment handling""" 340 | return { 341 | "description": "Guide for reading latest emails", 342 | "messages": [ 343 | { 344 | "role": "system", 345 | "content": """You're helping the user read their recent emails. Collect: 346 | 1. Email identifier (which account to read) 347 | 2. Number of emails to retrieve 348 | 3. Whether to automatically download attachments""" 349 | }, 350 | { 351 | "role": "user", 352 | "content": "I want to check my recent emails." 353 | }, 354 | { 355 | "role": "assistant", 356 | "content": """I'll help you check your recent emails. Please specify: 357 | 358 | 1. Which email account do you want to check? (email identifier) 359 | 2. How many recent emails would you like to see? (default is 5) 360 | 3. Should I automatically download any attachments found? (yes/no) 361 | Note: Attachments will be saved to a 'downloaded_attachments' folder.""" 362 | } 363 | ] 364 | } 365 | 366 | @mcp.prompt() 367 | def download_attachments_prompt() -> Dict[str, Any]: 368 | """Guide for downloading email attachments""" 369 | return { 370 | "description": "Guide for downloading email attachments", 371 | "messages": [ 372 | { 373 | "role": "system", 374 | "content": """You're helping the user download email attachments. Collect: 375 | 1. Email identifier (which account to use) 376 | 2. Message ID of the email 377 | 3. Whether to download attachments from the entire conversation thread""" 378 | }, 379 | { 380 | "role": "user", 381 | "content": "I want to download attachments from an email." 382 | }, 383 | { 384 | "role": "assistant", 385 | "content": """I'll help you download email attachments. Please provide: 386 | 387 | 1. Which email account has the attachments? (email identifier) 388 | 2. What's the Message ID of the email? (You can get this from search results) 389 | 3. Do you want to download attachments from the entire conversation thread? (yes/no) 390 | Note: Files will be saved to a 'downloaded_attachments' folder.""" 391 | } 392 | ] 393 | } 394 | 395 | if __name__ == "__main__": 396 | try: 397 | logger.info("Starting Gmail MCP server...") 398 | mcp.run() 399 | except KeyboardInterrupt: 400 | logger.info("Server shutting down gracefully...") 401 | except Exception as e: 402 | logger.error(f"Fatal server error: {str(e)}") 403 | ```