# Directory Structure ``` ├── .gitignore ├── .python-version ├── Dockerfile ├── gmail_api.py ├── gmail_mcp.log ├── gmail_server.py ├── gmail_token_creator.py ├── google_apis.py ├── LICENSE ├── pyproject.toml ├── read_emails.py ├── README.md ├── search_emails.py ├── send_emails.py ├── smithery.yaml └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.12 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python virtual environments venv/ .env # Token files and secrets client_secret.json token_files/ *.log __pycache__/ *.pyc # Mac/Windows files .DS_Store ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Gmail MCP Server [](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server) A powerful and flexible Gmail integration server built using the MCP (Message Control Protocol) framework. This server provides a robust interface to interact with Gmail APIs, offering functionality for reading, sending, and managing emails programmatically. ## Features - Read emails from multiple Gmail accounts - Send emails with attachments - Search emails with advanced query options - Download email attachments - Handle email conversations and threads - Real-time email monitoring - Support for multiple Gmail accounts ## Prerequisites Before running the Gmail MCP server, ensure you have the following: 1. Python 3.12 or higher 2. Google Cloud Project with Gmail API enabled 3. OAuth 2.0 Client ID credentials 4. Required Python packages (specified in pyproject.toml) ## Installation ### Installing via Smithery To install Gmail Integration Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server): ```bash npx -y @smithery/cli install @Quantum-369/Gmail-mcp-server --client claude ``` 1. Clone the repository: ```bash git clone <your-repository-url> cd gmail-mcp-server ``` 2. Create and activate a virtual environment: ```bash python -m venv venv # On Windows venv\Scripts\activate # On Unix/MacOS source venv/bin/activate ``` 3. Install dependencies: ```bash pip install . ``` ## Running on Ubuntu 1. Install Python 3.12 and create a virtual environment if you haven't already: ```bash sudo apt install python3.12 python3.12-venv -y python3.12 -m venv venv source venv/bin/activate ``` 2. Place your downloaded `client_secret.json` in the project root. 3. Generate an OAuth token for the Gmail account you want to use. Copy the URL printed by the script into a web browser and complete the sign-in process: ```bash python gmail_token_creator.py ``` 4. Start the server: ```bash python gmail_server.py ``` ## Setup Google Cloud Project 1. Go to the [Google Cloud Console](https://console.cloud.google.com/) 2. Create a new project or select an existing one 3. Enable the Gmail API for your project 4. Create OAuth 2.0 credentials: - Go to "APIs & Services" > "Credentials" - Click "Create Credentials" > "OAuth client ID" - Choose "Desktop app" as application type - Download the client configuration file 5. Rename the downloaded file to `client_secret.json` and place it in the project root directory ## Configuration 1. Set up email identifiers in `gmail_token_creator.py`: ```python email_identifier = '[email protected]' # Change this for each account ``` 2. Run the token creator to authenticate your Gmail accounts: ```bash python gmail_token_creator.py ``` The script prints an authorization URL. Copy this URL into your web browser, complete the Google consent flow, and copy the verification code back into the terminal if prompted. A token file will be created inside `token_files/` for future use. 3. Repeat the process for each Gmail account you want to integrate ## Server Structure - `gmail_server.py`: Main MCP server implementation - `gmail_api.py`: Gmail API interaction functions - `google_apis.py`: Google API authentication utilities - Supporting files: - `read_emails.py`: Email reading functionality - `search_emails.py`: Email search functionality - `send_emails.py`: Email sending functionality ## Usage ### Starting the Server ```bash python gmail_server.py ``` ### Running with Docker You can also build a container image using the provided `Dockerfile`: ```bash docker build -t gmail-mcp-server . docker run -v $(pwd)/client_secret.json:/app/client_secret.json \ -v $(pwd)/token_files:/app/token_files gmail-mcp-server ``` The container runs the same server and stores authentication tokens in the `token_files` directory on the host so they persist between runs. ### Available Tools 1. Send Email: ```python await send_gmail( email_identifier="[email protected]", to="[email protected]", subject="Test Subject", body="Email body content", attachment_paths=["path/to/attachment"] ) ``` 2. Search Emails: ```python await search_email_tool( email_identifier="[email protected]", query="from:[email protected]", max_results=30, include_conversations=True ) ``` 3. Read Latest Emails: ```python await read_latest_emails( email_identifier="[email protected]", max_results=5, download_attachments=False ) ``` 4. Download Attachments: ```python await download_email_attachments( email_identifier="[email protected]", msg_id="message_id", download_all_in_thread=False ) ``` ## Security Considerations - Store `client_secret.json` securely and never commit it to version control - Keep token files secure and add them to `.gitignore` - Use environment variables for sensitive information - Regularly rotate OAuth credentials - Monitor API usage and set appropriate quotas ## Error Handling The server includes comprehensive error handling and logging: - Logs are written to `gmail_mcp.log` - Both file and console logging are enabled - Detailed error messages for debugging ## Contributing 1. Fork the repository 2. Create a feature branch 3. Commit your changes 4. Push to the branch 5. Create a Pull Request ## License Apachelicense2.0 ## Support For issues and feature requests, please use the GitHub issue tracker. ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "gmail-server" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.12" dependencies = [ "google-api-python-client>=2.160.0", "google-auth>=2.38.0", "google-auth-oauthlib>=1.2.1", "mcp[cli]>=1.2.1", ] ``` -------------------------------------------------------------------------------- /gmail_token_creator.py: -------------------------------------------------------------------------------- ```python # gmail_api.py from google_apis import create_service client_secret_file = 'client_secret.json' API_SERVICE_NAME = 'gmail' API_VERSION = 'v1' SCOPES = ['https://mail.google.com/'] # Add an identifier for each account, like the email address email_identifier = '[email protected]' # Change this for each account #[email protected] #[email protected] #[email protected] #[email protected] #[email protected] # Pass the email identifier as a prefix to create unique token files service = create_service(client_secret_file, API_SERVICE_NAME, API_VERSION, SCOPES, prefix=f'_{email_identifier}') #print(dir(service)) ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object required: - clientSecretPath - emailIdentifier properties: clientSecretPath: type: string description: Path to the OAuth 2.0 client secret JSON file. emailIdentifier: type: string description: Email identifier for the Gmail account. commandFunction: # A function that produces the CLI command to start the MCP on stdio. |- (config) => ({ command: 'python', args: ['gmail_server.py'], env: { CLIENT_SECRET_PATH: config.clientSecretPath, EMAIL_IDENTIFIER: config.emailIdentifier } }) ``` -------------------------------------------------------------------------------- /send_emails.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from gmail_api import init_gmail_service, send_email # Configuration client_file = 'client_secret.json' email_identifier = '[email protected]' # Change this for each account # Initialize Gmail API service for the specific email account service = init_gmail_service(client_file, prefix=f'_{email_identifier}') # Email details to_address = '[email protected]' email_subject = 'MCP servers document' email_body = 'This is a test email sent using the Gmail API.' # Attachments attachment_dir = Path('C:\\Users\\harsh\\Downloads\\MS projects\\MCP\\mcpdocs') attachment_files = list(attachment_dir.glob('*')) # Load all files from the attachments folder # Send the email response_email_sent = send_email( service=service, to=to_address, subject=email_subject, body=email_body, body_type='plain', attachment_paths=attachment_files ) # Output response print(response_email_sent) ``` -------------------------------------------------------------------------------- /search_emails.py: -------------------------------------------------------------------------------- ```python from gmail_api import init_gmail_service, get_email_message_details, search_emails, search_email_conversations client_file = 'client_secret.json' email_identifier = '[email protected]' # Specify the email identifier # Initialize Gmail API service service = init_gmail_service(client_file, prefix=f'_{email_identifier}') query = 'from:me' email_messages = search_emails(service, query, max_results=30) email_messages += search_email_conversations(service, query, max_results=30) # Combine both for message in email_messages: email_detail = get_email_message_details(service, message['id']) if email_detail: # Check if email details were fetched successfully print(message['id']) print(f"Subject: {email_detail.get('subject', 'No Subject')}") print(f"Date: {email_detail.get('date', 'No Date')}") print(f"Label: {email_detail.get('label', 'No Label')}") print('Snippet: ', email_detail.get('snippet', 'No Snippet')) print(f"Body: {email_detail.get('body', 'No Body')}") print('-' * 50) else: print(f"Skipping message ID {message['id']} as it couldn't be retrieved.") ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile # Use a Python image with uv pre-installed FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv # Install the project into /app WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy from the cache instead of linking since it's a mounted volume ENV UV_LINK_MODE=copy # Install the project's dependencies using the lockfile and settings RUN --mount=type=cache,target=/root/.cache/uv --mount=type=bind,source=uv.lock,target=uv.lock --mount=type=bind,source=pyproject.toml,target=pyproject.toml uv sync --frozen --no-install-project --no-dev --no-editable # Then, add the rest of the project source code and install it # Installing separately from its dependencies allows optimal layer caching ADD . /app RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-dev --no-editable FROM python:3.12-slim-bookworm WORKDIR /app COPY --from=uv /root/.local /root/.local COPY --from=uv --chown=app:app /app/.venv /app/.venv # Place executables in the environment at the front of the path ENV PATH="/app/.venv/bin:$PATH" # when running the container, run the main server script ENTRYPOINT ["python", "gmail_server.py"] ``` -------------------------------------------------------------------------------- /read_emails.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from gmail_api import init_gmail_service, get_email_messages, get_email_message_details, download_attachments_parent client_file = 'client_secret.json' email_identifier = '[email protected]' # Specify the email identifier # Initialize Gmail API service service = init_gmail_service(client_file, prefix=f'_{email_identifier}') # Correctly unpack the messages and next_page_token messages, _ = get_email_messages(service, max_results=5) # Target directory to save attachments attachment_dir = Path('./downloaded_attachments') # Folder to save attachments attachment_dir.mkdir(exist_ok=True) # Create the folder if it doesn't exist # Process Emails for msg in messages: details = get_email_message_details(service, msg['id']) if details: print(f"Subject: {details['subject']}") print(f"From: {details['sender']}") print(f"Recipients: {details['recipients']}") print(f"Body: {details['body'][:100]}...") # Print first 100 characters of the body print(f"Snippet: {details['snippet']}") print(f"Has Attachments: {details['has_attachments']}") print(f"Date: {details['date']}") print(f"Star: {details['star']}") print(f"Label: {details['label']}") print("-" * 50) # Download Attachments if present if details['has_attachments']: download_attachments_parent(service, user_id='me', msg_id=msg['id'], target_dir=str(attachment_dir)) ``` -------------------------------------------------------------------------------- /google_apis.py: -------------------------------------------------------------------------------- ```python # google_apis.py import os from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request def create_service(client_secret_file, api_name, api_version, *scopes, prefix=''): CLIENT_SECRET_FILE = client_secret_file API_SERVICE_NAME = api_name API_VERSION = api_version SCOPES = [scope for scope in scopes[0]] creds = None working_dir = os.getcwd() token_dir = 'token_files' # Include the prefix (email identifier) in the token file name token_file = f'token_{API_SERVICE_NAME}_{API_VERSION}{prefix}.json' # Check if the token directory exists, create if not if not os.path.exists(os.path.join(working_dir, token_dir)): os.mkdir(os.path.join(working_dir, token_dir)) # Load existing credentials if available if os.path.exists(os.path.join(working_dir, token_dir, token_file)): creds = Credentials.from_authorized_user_file( os.path.join(working_dir, token_dir, token_file), SCOPES ) # If no valid credentials, initiate the authentication flow if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for future use with open(os.path.join(working_dir, token_dir, token_file), 'w') as token: token.write(creds.to_json()) try: service = build(API_SERVICE_NAME, API_VERSION, credentials=creds, static_discovery=False) print(f'{API_SERVICE_NAME} {API_VERSION} service created successfully for {prefix}') return service except Exception as e: print(e) print(f'Failed to create service instance for {API_SERVICE_NAME}') # Remove corrupted token file if exists if os.path.exists(os.path.join(working_dir, token_dir, token_file)): os.remove(os.path.join(working_dir, token_dir, token_file)) return None ``` -------------------------------------------------------------------------------- /gmail_api.py: -------------------------------------------------------------------------------- ```python import os import base64 from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders from pathlib import Path from google_apis import create_service def init_gmail_service(client_file, api_name='gmail', api_version='v1', scopes=['https://mail.google.com/'], prefix=''): return create_service(client_file, api_name, api_version, scopes, prefix=prefix) def _extract_body(payload): body = '<text body not available>' if 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'multipart/alternative': for subpart in part['parts']: if subpart['mimeType'] == 'text/plain' and 'data' in subpart['body']: body = base64.urlsafe_b64decode(subpart['body']['data']).decode('utf-8') break elif part['body']['data']: body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') break return body def get_email_messages(service, user_id='me', label_ids=None, folder_name='INBOX', max_results=5): messages = [] next_page_token = None if folder_name: label_results = service.users().labels().list(userId=user_id).execute() labels = label_results.get('labels', []) folder_label_id = next((label['id'] for label in labels if label['name'].lower() == folder_name.lower()), None) if folder_label_id: message_response = service.users().messages().list(userId=user_id, labelIds=[folder_label_id], maxResults=max_results).execute() messages.extend(message_response.get('messages', [])) next_page_token = message_response.get('nextPageToken', None) return messages, next_page_token def get_email_message_details(service, msg_id): try: message = service.users().messages().get(userId='me', id=msg_id).execute() payload = message['payload'] headers = payload.get('headers', []) subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No subject') sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'No sender') recipients = next((header['value'] for header in headers if header['name'].lower() == 'to'), 'No recipients') snippet = message.get('snippet', 'No snippet') has_attachments = any(part.get('filename') for part in payload.get('parts', []) if part.get('filename')) date = next((header['value'] for header in headers if header['name'].lower() == 'date'), 'No date') star = message.get('labelIds', []).count('STARRED') > 0 label = ', '.join(message.get('labelIds', [])) body = _extract_body(payload) return { 'subject': subject, 'sender': sender, 'recipients': recipients, 'body': body, 'snippet': snippet, 'has_attachments': has_attachments, 'date': date, 'star': star, 'label': label } except Exception as e: print(f'Error getting email message details: {e}') return None def send_email(service, to, subject, body, body_type='plain', attachment_paths=None): from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import base64 import os from pathlib import Path import mimetypes message = MIMEMultipart() message['to'] = to message['subject'] = subject message.attach(MIMEText(body, body_type)) # Handle attachments if attachment_paths: for attachment_path in attachment_paths: attachment_path = Path(attachment_path) # Ensure it's a Path object if not attachment_path.exists(): raise FileNotFoundError(f"File not found - {attachment_path}") # Guess the MIME type based on file extension content_type, encoding = mimetypes.guess_type(attachment_path) if content_type is None or encoding is not None: content_type = 'application/octet-stream' main_type, sub_type = content_type.split('/', 1) with open(attachment_path, 'rb') as attachment_file: part = MIMEBase(main_type, sub_type) part.set_payload(attachment_file.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename="{attachment_path.name}"' ) message.attach(part) # Encode message and send raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode() send_message = {'raw': raw_message} try: sent_message = service.users().messages().send(userId='me', body=send_message).execute() print(f"Email sent successfully to {to} with attachments.") return sent_message except Exception as e: print(f"An error occurred: {e}") return None def download_attachments_parent(service, user_id, msg_id, target_dir): message = service.users().messages().get(userId=user_id, id=msg_id).execute() for part in message['payload']['parts']: if part['filename']: att_id = part['body']['attachmentId'] att = service.users().messages().attachments().get(userId=user_id, messageId=msg_id, id=att_id).execute() data = att['data'] file_data = base64.urlsafe_b64decode(data.encode('UTF-8')) file_path = os.path.join(target_dir, part['filename']) print('Saving attachment to:', file_path) with open(file_path, 'wb') as f: f.write(file_data) def download_attachments_all(service, user_id, msg_id, target_dir): thread = service.users().threads().get(userId=user_id, id=msg_id).execute() for message in thread['messages']: for part in message['payload']['parts']: if part['filename']: att_id = part['body']['attachmentId'] att = service.users().messages().attachments().get(userId=user_id, messageId=message['id'], id=att_id).execute() data = att['data'] file_data = base64.urlsafe_b64decode(data.encode('UTF-8')) file_path = os.path.join(target_dir, part['filename']) print('Saving attachment to:', file_path) with open(file_path, 'wb') as f: f.write(file_data) def search_emails(service, query, user_id='me', max_results=5): messages = [] next_page_token = None while True: result = service.users().messages().list( userId=user_id, q=query, maxResults=min(500, max_results - len(messages)) if max_results else 500, pageToken=next_page_token ).execute() messages.extend(result.get('messages', [])) next_page_token = result.get('nextPageToken') if not next_page_token or (max_results and len(messages) >= max_results): break return messages[:max_results] if max_results else messages def search_email_conversations(service, query, user_id='me', max_results=5): conversations = [] next_page_token = None while True: result = service.users().threads().list( userId=user_id, q=query, maxResults=min(500, max_results - len(conversations)) if max_results else 500, pageToken=next_page_token ).execute() conversations.extend(result.get('threads', [])) next_page_token = result.get('nextPageToken') if not next_page_token or (max_results and len(conversations) >= max_results): break return conversations[:max_results] if max_results else conversations ``` -------------------------------------------------------------------------------- /gmail_server.py: -------------------------------------------------------------------------------- ```python import os import logging from pathlib import Path from typing import List, Optional, Dict, Any from mcp.server.fastmcp import FastMCP, Context from gmail_api import ( init_gmail_service, get_email_message_details, search_emails, search_email_conversations, send_email, get_email_messages, download_attachments_parent, download_attachments_all ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('gmail_mcp.log'), logging.StreamHandler() ] ) logger = logging.getLogger('gmail_mcp') # Initialize MCP Server mcp = FastMCP( "Gmail MCP Server", dependencies=["google-api-python-client", "google-auth-oauthlib"] ) # Gmail Service Initialization CLIENT_FILE = 'client_secret.json' def get_gmail_service(email_identifier: str): try: service = init_gmail_service(CLIENT_FILE, prefix=f'_{email_identifier}') if not service: raise ValueError(f"Failed to initialize Gmail service for {email_identifier}") return service except Exception as e: logger.error(f"Error initializing Gmail service: {str(e)}") raise # Resources @mcp.resource("gmail://inbox/{email_identifier}") async def get_inbox(email_identifier: str) -> Dict[str, Any]: """Get latest emails from inbox""" try: logger.info(f"Fetching inbox for {email_identifier}") service = get_gmail_service(email_identifier) messages, next_page = get_email_messages(service, max_results=10) emails = [] for msg in messages: details = get_email_message_details(service, msg['id']) if details: emails.append(details) return { "success": True, "emails": emails, "has_more": bool(next_page) } except Exception as e: logger.error(f"Error fetching inbox: {str(e)}") return {"success": False, "message": str(e)} @mcp.resource("gmail://email/{email_identifier}/{msg_id}") async def get_email_details(email_identifier: str, msg_id: str) -> Dict[str, Any]: """Get detailed information about a specific email""" try: logger.info(f"Fetching email details for ID {msg_id}") service = get_gmail_service(email_identifier) details = get_email_message_details(service, msg_id) if details: return {"success": True, "email": details} return {"success": False, "message": "Email not found"} except Exception as e: logger.error(f"Error fetching email details: {str(e)}") return {"success": False, "message": str(e)} @mcp.resource("gmail://attachments/{email_identifier}/{msg_id}") async def list_attachments(email_identifier: str, msg_id: str) -> Dict[str, Any]: """List attachments for a specific email""" try: logger.info(f"Listing attachments for email {msg_id}") service = get_gmail_service(email_identifier) details = get_email_message_details(service, msg_id) if details and details.get('has_attachments'): return { "success": True, "has_attachments": True, "message_id": msg_id } return { "success": True, "has_attachments": False } except Exception as e: logger.error(f"Error listing attachments: {str(e)}") return {"success": False, "message": str(e)} # Tools @mcp.tool() async def send_gmail( email_identifier: str, to: str, subject: str, body: str, attachment_paths: Optional[List[str]] = None ) -> Dict[str, Any]: """Send an email with optional attachments""" try: logger.info(f"Sending email to {to} from {email_identifier}") service = get_gmail_service(email_identifier) # Validate attachment paths if attachment_paths: for path in attachment_paths: if not os.path.exists(path): return { "success": False, "message": f"Attachment not found: {path}" } response = send_email( service=service, to=to, subject=subject, body=body, body_type='plain', attachment_paths=attachment_paths ) if response: return { "success": True, "message": f"Email sent successfully to {to}", "message_id": response.get('id', 'unknown') } return { "success": False, "message": "Failed to send email" } except Exception as e: logger.error(f"Error sending email: {str(e)}") return {"success": False, "message": str(e)} @mcp.tool() async def search_email_tool( email_identifier: str, query: str = '', max_results: int = 30, include_conversations: bool = True ) -> Dict[str, Any]: """Search emails with optional conversation inclusion""" try: logger.info(f"Searching emails for {email_identifier} with query: {query}") service = get_gmail_service(email_identifier) emails = [] # Search regular emails messages = search_emails(service, query, max_results=max_results) for msg in messages: details = get_email_message_details(service, msg['id']) if details: emails.append(details) # Search conversations if requested if include_conversations: conversations = search_email_conversations(service, query, max_results=max_results) for conv in conversations: details = get_email_message_details(service, conv['id']) if details: emails.append(details) return { "success": True, "message": f"Found {len(emails)} emails", "emails": emails } except Exception as e: logger.error(f"Error searching emails: {str(e)}") return {"success": False, "message": str(e), "emails": []} @mcp.tool() async def read_latest_emails( email_identifier: str, max_results: int = 5, download_attachments: bool = False ) -> Dict[str, Any]: """Read latest emails with optional attachment download""" try: logger.info(f"Reading latest {max_results} emails for {email_identifier}") service = get_gmail_service(email_identifier) messages, _ = get_email_messages(service, max_results=max_results) emails = [] attachment_dir = Path('./downloaded_attachments') if download_attachments: attachment_dir.mkdir(exist_ok=True) for msg in messages: details = get_email_message_details(service, msg['id']) if details: if download_attachments and details.get('has_attachments'): download_attachments_parent( service, user_id='me', msg_id=msg['id'], target_dir=str(attachment_dir) ) details['attachments_downloaded'] = True details['attachment_dir'] = str(attachment_dir) emails.append(details) return { "success": True, "message": f"Retrieved {len(emails)} latest emails", "emails": emails, "attachment_downloads": download_attachments } except Exception as e: logger.error(f"Error reading latest emails: {str(e)}") return {"success": False, "message": str(e), "emails": []} @mcp.tool() async def download_email_attachments( email_identifier: str, msg_id: str, download_all_in_thread: bool = False ) -> Dict[str, Any]: """Download attachments for a specific email or its entire thread""" try: logger.info(f"Downloading attachments for email {msg_id}") service = get_gmail_service(email_identifier) attachment_dir = Path('./downloaded_attachments') attachment_dir.mkdir(exist_ok=True) if download_all_in_thread: download_attachments_all( service, user_id='me', msg_id=msg_id, target_dir=str(attachment_dir) ) else: download_attachments_parent( service, user_id='me', msg_id=msg_id, target_dir=str(attachment_dir) ) return { "success": True, "message": "Attachments downloaded successfully", "directory": str(attachment_dir), "thread_downloaded": download_all_in_thread } except Exception as e: logger.error(f"Error downloading attachments: {str(e)}") return {"success": False, "message": str(e)} # Prompts @mcp.prompt() def compose_email_prompt() -> Dict[str, Any]: """Guide for composing and sending an email""" return { "description": "Guide for composing and sending an email", "messages": [ { "role": "system", "content": """You're helping the user compose and send an email. Make sure to collect: 1. Email identifier (the account sending the email) 2. Recipient's email address 3. Subject line 4. Email body content 5. Any attachments (optional) - provide full file paths""" }, { "role": "user", "content": "I need to send an email." }, { "role": "assistant", "content": """I'll help you compose and send an email. Please provide: 1. Which email account should send this? (email identifier) 2. Who are you sending it to? (recipient's email) 3. What's the subject of your email? 4. What would you like to say in the email? 5. Do you need to attach any files? If yes, please provide the file paths.""" } ] } @mcp.prompt() def search_email_prompt() -> Dict[str, Any]: """Guide for searching emails with various criteria""" return { "description": "Guide for searching emails", "messages": [ { "role": "system", "content": """You're helping the user search through their emails. Collect: 1. Email identifier (which account to search) 2. Search criteria (from, to, subject, date range, etc.) 3. Maximum number of results needed 4. Whether to include conversation threads""" }, { "role": "user", "content": "I want to search my emails." }, { "role": "assistant", "content": """I'll help you search your emails. Please specify: 1. Which email account do you want to search? (email identifier) 2. What are you looking for? You can search by: - Sender (from:[email protected]) - Subject (subject:meeting) - Date range (after:2024/01/01 before:2024/02/01) - Has attachment (has:attachment) Or combine these criteria. 3. How many results would you like to see? (default is 30) 4. Should I include conversation threads in the search? (yes/no)""" } ] } @mcp.prompt() def read_latest_emails_prompt() -> Dict[str, Any]: """Guide for reading recent emails with optional attachment handling""" return { "description": "Guide for reading latest emails", "messages": [ { "role": "system", "content": """You're helping the user read their recent emails. Collect: 1. Email identifier (which account to read) 2. Number of emails to retrieve 3. Whether to automatically download attachments""" }, { "role": "user", "content": "I want to check my recent emails." }, { "role": "assistant", "content": """I'll help you check your recent emails. Please specify: 1. Which email account do you want to check? (email identifier) 2. How many recent emails would you like to see? (default is 5) 3. Should I automatically download any attachments found? (yes/no) Note: Attachments will be saved to a 'downloaded_attachments' folder.""" } ] } @mcp.prompt() def download_attachments_prompt() -> Dict[str, Any]: """Guide for downloading email attachments""" return { "description": "Guide for downloading email attachments", "messages": [ { "role": "system", "content": """You're helping the user download email attachments. Collect: 1. Email identifier (which account to use) 2. Message ID of the email 3. Whether to download attachments from the entire conversation thread""" }, { "role": "user", "content": "I want to download attachments from an email." }, { "role": "assistant", "content": """I'll help you download email attachments. Please provide: 1. Which email account has the attachments? (email identifier) 2. What's the Message ID of the email? (You can get this from search results) 3. Do you want to download attachments from the entire conversation thread? (yes/no) Note: Files will be saved to a 'downloaded_attachments' folder.""" } ] } if __name__ == "__main__": try: logger.info("Starting Gmail MCP server...") mcp.run() except KeyboardInterrupt: logger.info("Server shutting down gracefully...") except Exception as e: logger.error(f"Fatal server error: {str(e)}") ```