#
tokens: 8398/50000 13/13 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── Dockerfile
├── gmail_api.py
├── gmail_mcp.log
├── gmail_server.py
├── gmail_token_creator.py
├── google_apis.py
├── LICENSE
├── pyproject.toml
├── read_emails.py
├── README.md
├── search_emails.py
├── send_emails.py
├── smithery.yaml
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python virtual environments
venv/
.env

# Token files and secrets
client_secret.json
token_files/
*.log
__pycache__/
*.pyc

# Mac/Windows files
.DS_Store


```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Gmail MCP Server
[![smithery badge](https://smithery.ai/badge/@Quantum-369/Gmail-mcp-server)](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server)

A powerful and flexible Gmail integration server built using the MCP (Message Control Protocol) framework. This server provides a robust interface to interact with Gmail APIs, offering functionality for reading, sending, and managing emails programmatically.

## Features

- Read emails from multiple Gmail accounts
- Send emails with attachments
- Search emails with advanced query options
- Download email attachments
- Handle email conversations and threads
- Real-time email monitoring
- Support for multiple Gmail accounts

## Prerequisites

Before running the Gmail MCP server, ensure you have the following:

1. Python 3.12 or higher
2. Google Cloud Project with Gmail API enabled
3. OAuth 2.0 Client ID credentials
4. Required Python packages (specified in pyproject.toml)

## Installation

### Installing via Smithery

To install Gmail Integration Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server):

```bash
npx -y @smithery/cli install @Quantum-369/Gmail-mcp-server --client claude
```

1. Clone the repository:
```bash
git clone <your-repository-url>
cd gmail-mcp-server
```

2. Create and activate a virtual environment:
```bash
python -m venv venv
# On Windows
venv\Scripts\activate
# On Unix/MacOS
source venv/bin/activate
```

3. Install dependencies:
```bash
pip install .
```

## Running on Ubuntu

1. Install Python 3.12 and create a virtual environment if you haven't already:
   ```bash
   sudo apt install python3.12 python3.12-venv -y
   python3.12 -m venv venv
   source venv/bin/activate
   ```
2. Place your downloaded `client_secret.json` in the project root.
3. Generate an OAuth token for the Gmail account you want to use. Copy the URL
   printed by the script into a web browser and complete the sign-in process:
   ```bash
   python gmail_token_creator.py
   ```
4. Start the server:
   ```bash
   python gmail_server.py
   ```

## Setup Google Cloud Project

1. Go to the [Google Cloud Console](https://console.cloud.google.com/)
2. Create a new project or select an existing one
3. Enable the Gmail API for your project
4. Create OAuth 2.0 credentials:
   - Go to "APIs & Services" > "Credentials"
   - Click "Create Credentials" > "OAuth client ID"
   - Choose "Desktop app" as application type
   - Download the client configuration file
5. Rename the downloaded file to `client_secret.json` and place it in the project root directory

## Configuration

1. Set up email identifiers in `gmail_token_creator.py`:
```python
email_identifier = '[email protected]'  # Change this for each account
```

2. Run the token creator to authenticate your Gmail accounts:
```bash
python gmail_token_creator.py
```
The script prints an authorization URL. Copy this URL into your web browser,
complete the Google consent flow, and copy the verification code back into the
terminal if prompted. A token file will be created inside `token_files/` for
future use.

3. Repeat the process for each Gmail account you want to integrate

## Server Structure

- `gmail_server.py`: Main MCP server implementation
- `gmail_api.py`: Gmail API interaction functions
- `google_apis.py`: Google API authentication utilities
- Supporting files:
  - `read_emails.py`: Email reading functionality
  - `search_emails.py`: Email search functionality
  - `send_emails.py`: Email sending functionality

## Usage

### Starting the Server

```bash
python gmail_server.py
```

### Running with Docker

You can also build a container image using the provided `Dockerfile`:

```bash
docker build -t gmail-mcp-server .
docker run -v $(pwd)/client_secret.json:/app/client_secret.json \
           -v $(pwd)/token_files:/app/token_files gmail-mcp-server
```

The container runs the same server and stores authentication tokens in the
`token_files` directory on the host so they persist between runs.

### Available Tools

1. Send Email:
```python
await send_gmail(
    email_identifier="[email protected]",
    to="[email protected]",
    subject="Test Subject",
    body="Email body content",
    attachment_paths=["path/to/attachment"]
)
```

2. Search Emails:
```python
await search_email_tool(
    email_identifier="[email protected]",
    query="from:[email protected]",
    max_results=30,
    include_conversations=True
)
```

3. Read Latest Emails:
```python
await read_latest_emails(
    email_identifier="[email protected]",
    max_results=5,
    download_attachments=False
)
```

4. Download Attachments:
```python
await download_email_attachments(
    email_identifier="[email protected]",
    msg_id="message_id",
    download_all_in_thread=False
)
```

## Security Considerations

- Store `client_secret.json` securely and never commit it to version control
- Keep token files secure and add them to `.gitignore`
- Use environment variables for sensitive information
- Regularly rotate OAuth credentials
- Monitor API usage and set appropriate quotas

## Error Handling

The server includes comprehensive error handling and logging:
- Logs are written to `gmail_mcp.log`
- Both file and console logging are enabled
- Detailed error messages for debugging

## Contributing

1. Fork the repository
2. Create a feature branch
3. Commit your changes
4. Push to the branch
5. Create a Pull Request

## License

Apachelicense2.0

## Support

For issues and feature requests, please use the GitHub issue tracker.

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "gmail-server"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "google-api-python-client>=2.160.0",
    "google-auth>=2.38.0",
    "google-auth-oauthlib>=1.2.1",
    "mcp[cli]>=1.2.1",
]

```

--------------------------------------------------------------------------------
/gmail_token_creator.py:
--------------------------------------------------------------------------------

```python

# gmail_api.py
from google_apis import create_service

client_secret_file = 'client_secret.json'
API_SERVICE_NAME = 'gmail'
API_VERSION = 'v1'
SCOPES = ['https://mail.google.com/']

# Add an identifier for each account, like the email address
email_identifier = '[email protected]'  # Change this for each account
#[email protected]
#[email protected]
#[email protected]
#[email protected]
#[email protected]

# Pass the email identifier as a prefix to create unique token files
service = create_service(client_secret_file, API_SERVICE_NAME, API_VERSION, SCOPES, prefix=f'_{email_identifier}')
#print(dir(service))

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - clientSecretPath
      - emailIdentifier
    properties:
      clientSecretPath:
        type: string
        description: Path to the OAuth 2.0 client secret JSON file.
      emailIdentifier:
        type: string
        description: Email identifier for the Gmail account.
  commandFunction:
    # A function that produces the CLI command to start the MCP on stdio.
    |-
    (config) => ({ command: 'python', args: ['gmail_server.py'], env: { CLIENT_SECRET_PATH: config.clientSecretPath, EMAIL_IDENTIFIER: config.emailIdentifier } })

```

--------------------------------------------------------------------------------
/send_emails.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
from gmail_api import init_gmail_service, send_email

# Configuration
client_file = 'client_secret.json'
email_identifier = '[email protected]'  # Change this for each account

# Initialize Gmail API service for the specific email account
service = init_gmail_service(client_file, prefix=f'_{email_identifier}')

# Email details
to_address = '[email protected]'
email_subject = 'MCP servers document'
email_body = 'This is a test email sent using the Gmail API.'

# Attachments
attachment_dir = Path('C:\\Users\\harsh\\Downloads\\MS projects\\MCP\\mcpdocs')
attachment_files = list(attachment_dir.glob('*'))  # Load all files from the attachments folder

# Send the email
response_email_sent = send_email(
    service=service,
    to=to_address,
    subject=email_subject,
    body=email_body, 
    body_type='plain',
    attachment_paths=attachment_files
)

# Output response
print(response_email_sent)


```

--------------------------------------------------------------------------------
/search_emails.py:
--------------------------------------------------------------------------------

```python
from gmail_api import init_gmail_service, get_email_message_details, search_emails, search_email_conversations

client_file = 'client_secret.json'
email_identifier = '[email protected]'  # Specify the email identifier

# Initialize Gmail API service
service = init_gmail_service(client_file, prefix=f'_{email_identifier}')

query = 'from:me'
email_messages = search_emails(service, query, max_results=30)
email_messages += search_email_conversations(service, query, max_results=30)  # Combine both

for message in email_messages:
    email_detail = get_email_message_details(service, message['id'])
    
    if email_detail:  # Check if email details were fetched successfully
        print(message['id'])
        print(f"Subject: {email_detail.get('subject', 'No Subject')}")
        print(f"Date: {email_detail.get('date', 'No Date')}")
        print(f"Label: {email_detail.get('label', 'No Label')}")
        print('Snippet: ', email_detail.get('snippet', 'No Snippet'))
        print(f"Body: {email_detail.get('body', 'No Body')}")
        print('-' * 50)
    else:
        print(f"Skipping message ID {message['id']} as it couldn't be retrieved.")


```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv

# Install the project into /app
WORKDIR /app

# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1

# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy

# Install the project's dependencies using the lockfile and settings
RUN --mount=type=cache,target=/root/.cache/uv     --mount=type=bind,source=uv.lock,target=uv.lock     --mount=type=bind,source=pyproject.toml,target=pyproject.toml     uv sync --frozen --no-install-project --no-dev --no-editable

# Then, add the rest of the project source code and install it
# Installing separately from its dependencies allows optimal layer caching
ADD . /app
RUN --mount=type=cache,target=/root/.cache/uv     uv sync --frozen --no-dev --no-editable

FROM python:3.12-slim-bookworm

WORKDIR /app

COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv

# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"

# when running the container, run the main server script
ENTRYPOINT ["python", "gmail_server.py"]

```

--------------------------------------------------------------------------------
/read_emails.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
from gmail_api import init_gmail_service, get_email_messages, get_email_message_details, download_attachments_parent

client_file = 'client_secret.json'
email_identifier = '[email protected]'  # Specify the email identifier

# Initialize Gmail API service
service = init_gmail_service(client_file, prefix=f'_{email_identifier}')

# Correctly unpack the messages and next_page_token
messages, _ = get_email_messages(service, max_results=5)

# Target directory to save attachments
attachment_dir = Path('./downloaded_attachments')  # Folder to save attachments
attachment_dir.mkdir(exist_ok=True)  # Create the folder if it doesn't exist

# Process Emails
for msg in messages:
    details = get_email_message_details(service, msg['id'])
    if details:
        print(f"Subject: {details['subject']}")
        print(f"From: {details['sender']}")
        print(f"Recipients: {details['recipients']}")
        print(f"Body: {details['body'][:100]}...")  # Print first 100 characters of the body
        print(f"Snippet: {details['snippet']}")
        print(f"Has Attachments: {details['has_attachments']}")
        print(f"Date: {details['date']}")
        print(f"Star: {details['star']}")
        print(f"Label: {details['label']}")
        print("-" * 50)

        # Download Attachments if present
        if details['has_attachments']:
            download_attachments_parent(service, user_id='me', msg_id=msg['id'], target_dir=str(attachment_dir))

```

--------------------------------------------------------------------------------
/google_apis.py:
--------------------------------------------------------------------------------

```python
# google_apis.py
import os
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request

def create_service(client_secret_file, api_name, api_version, *scopes, prefix=''):
    CLIENT_SECRET_FILE = client_secret_file
    API_SERVICE_NAME = api_name
    API_VERSION = api_version

    SCOPES = [scope for scope in scopes[0]]

    creds = None
    working_dir = os.getcwd()
    token_dir = 'token_files'

    # Include the prefix (email identifier) in the token file name
    token_file = f'token_{API_SERVICE_NAME}_{API_VERSION}{prefix}.json'

    # Check if the token directory exists, create if not
    if not os.path.exists(os.path.join(working_dir, token_dir)):
        os.mkdir(os.path.join(working_dir, token_dir))

    # Load existing credentials if available
    if os.path.exists(os.path.join(working_dir, token_dir, token_file)):
        creds = Credentials.from_authorized_user_file(
            os.path.join(working_dir, token_dir, token_file), SCOPES
        )

    # If no valid credentials, initiate the authentication flow
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES)
            creds = flow.run_local_server(port=0)

        # Save the credentials for future use
        with open(os.path.join(working_dir, token_dir, token_file), 'w') as token:
            token.write(creds.to_json())

    try:
        service = build(API_SERVICE_NAME, API_VERSION, credentials=creds, static_discovery=False)
        print(f'{API_SERVICE_NAME} {API_VERSION} service created successfully for {prefix}')
        return service
    except Exception as e:
        print(e)
        print(f'Failed to create service instance for {API_SERVICE_NAME}')
        # Remove corrupted token file if exists
        if os.path.exists(os.path.join(working_dir, token_dir, token_file)):
            os.remove(os.path.join(working_dir, token_dir, token_file))
        return None

```

--------------------------------------------------------------------------------
/gmail_api.py:
--------------------------------------------------------------------------------

```python
import os
import base64
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
from google_apis import create_service

def init_gmail_service(client_file, api_name='gmail', api_version='v1', scopes=['https://mail.google.com/'], prefix=''):
    return create_service(client_file, api_name, api_version, scopes, prefix=prefix)


def _extract_body(payload):
    body = '<text body not available>'
    if 'parts' in payload:
        for part in payload['parts']:
            if part['mimeType'] == 'multipart/alternative':
                for subpart in part['parts']:
                    if subpart['mimeType'] == 'text/plain' and 'data' in subpart['body']:
                        body = base64.urlsafe_b64decode(subpart['body']['data']).decode('utf-8')
                        break
            elif part['body']['data']:
                body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
                break
    return body

def get_email_messages(service, user_id='me', label_ids=None, folder_name='INBOX', max_results=5):
    messages = []
    next_page_token = None

    if folder_name:
        label_results = service.users().labels().list(userId=user_id).execute()
        labels = label_results.get('labels', [])
        folder_label_id = next((label['id'] for label in labels if label['name'].lower() == folder_name.lower()), None)

        if folder_label_id:
            message_response = service.users().messages().list(userId=user_id, labelIds=[folder_label_id], maxResults=max_results).execute()
            messages.extend(message_response.get('messages', []))
            next_page_token = message_response.get('nextPageToken', None)

    return messages, next_page_token

def get_email_message_details(service, msg_id):
    try:
        message = service.users().messages().get(userId='me', id=msg_id).execute()
        payload = message['payload']
        headers = payload.get('headers', [])
        
        subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No subject')
        
        sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'No sender')
        
        recipients = next((header['value'] for header in headers if header['name'].lower() == 'to'), 'No recipients')
        
        snippet = message.get('snippet', 'No snippet')
        
        has_attachments = any(part.get('filename') for part in payload.get('parts', []) if part.get('filename'))
        
        date = next((header['value'] for header in headers if header['name'].lower() == 'date'), 'No date')
        
        star = message.get('labelIds', []).count('STARRED') > 0
        
        label = ', '.join(message.get('labelIds', []))
        
        body = _extract_body(payload)
        
        return {
            'subject': subject,
            'sender': sender,
            'recipients': recipients,
            'body': body,
            'snippet': snippet,
            'has_attachments': has_attachments,
            'date': date,
            'star': star,
            'label': label
        }
    except Exception as e:
        print(f'Error getting email message details: {e}')
        return None
def send_email(service, to, subject, body, body_type='plain', attachment_paths=None):
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    from email.mime.base import MIMEBase
    from email import encoders
    import base64
    import os
    from pathlib import Path
    import mimetypes

    message = MIMEMultipart()
    message['to'] = to
    message['subject'] = subject

    message.attach(MIMEText(body, body_type))

    # Handle attachments
    if attachment_paths:
        for attachment_path in attachment_paths:
            attachment_path = Path(attachment_path)  # Ensure it's a Path object
            if not attachment_path.exists():
                raise FileNotFoundError(f"File not found - {attachment_path}")

            # Guess the MIME type based on file extension
            content_type, encoding = mimetypes.guess_type(attachment_path)
            if content_type is None or encoding is not None:
                content_type = 'application/octet-stream'

            main_type, sub_type = content_type.split('/', 1)

            with open(attachment_path, 'rb') as attachment_file:
                part = MIMEBase(main_type, sub_type)
                part.set_payload(attachment_file.read())
                encoders.encode_base64(part)
                part.add_header(
                    'Content-Disposition',
                    f'attachment; filename="{attachment_path.name}"'
                )
                message.attach(part)

    # Encode message and send
    raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
    send_message = {'raw': raw_message}

    try:
        sent_message = service.users().messages().send(userId='me', body=send_message).execute()
        print(f"Email sent successfully to {to} with attachments.")
        return sent_message
    except Exception as e:
        print(f"An error occurred: {e}")
        return None
def download_attachments_parent(service, user_id, msg_id, target_dir):
    message = service.users().messages().get(userId=user_id, id=msg_id).execute()
    for part in message['payload']['parts']:
        if part['filename']:
            att_id = part['body']['attachmentId']
            att = service.users().messages().attachments().get(userId=user_id, messageId=msg_id, id=att_id).execute()
            data = att['data']
            file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))
            file_path = os.path.join(target_dir, part['filename'])
            print('Saving attachment to:', file_path)
            with open(file_path, 'wb') as f:
                f.write(file_data)

def download_attachments_all(service, user_id, msg_id, target_dir):
    thread = service.users().threads().get(userId=user_id, id=msg_id).execute()
    for message in thread['messages']:
        for part in message['payload']['parts']:
            if part['filename']:
                att_id = part['body']['attachmentId']
                att = service.users().messages().attachments().get(userId=user_id, messageId=message['id'], id=att_id).execute()
                data = att['data']
                file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))
                file_path = os.path.join(target_dir, part['filename'])
                print('Saving attachment to:', file_path)
                with open(file_path, 'wb') as f:
                    f.write(file_data)
def search_emails(service, query, user_id='me', max_results=5):
    messages = []
    next_page_token = None

    while True:
        result = service.users().messages().list(
            userId=user_id,
            q=query,
            maxResults=min(500, max_results - len(messages)) if max_results else 500,
            pageToken=next_page_token
        ).execute()

        messages.extend(result.get('messages', []))

        next_page_token = result.get('nextPageToken')

        if not next_page_token or (max_results and len(messages) >= max_results):
            break

    return messages[:max_results] if max_results else messages

def search_email_conversations(service, query, user_id='me', max_results=5):
    conversations = []
    next_page_token = None

    while True:
        result = service.users().threads().list(
            userId=user_id,
            q=query,
            maxResults=min(500, max_results - len(conversations)) if max_results else 500,
            pageToken=next_page_token
        ).execute()

        conversations.extend(result.get('threads', []))

        next_page_token = result.get('nextPageToken')

        if not next_page_token or (max_results and len(conversations) >= max_results):
            break

    return conversations[:max_results] if max_results else conversations

```

--------------------------------------------------------------------------------
/gmail_server.py:
--------------------------------------------------------------------------------

```python
import os
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any
from mcp.server.fastmcp import FastMCP, Context

from gmail_api import (
    init_gmail_service,
    get_email_message_details,
    search_emails,
    search_email_conversations,
    send_email,
    get_email_messages,
    download_attachments_parent,
    download_attachments_all
)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('gmail_mcp.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('gmail_mcp')

# Initialize MCP Server
mcp = FastMCP(
    "Gmail MCP Server",
    dependencies=["google-api-python-client", "google-auth-oauthlib"]
)

# Gmail Service Initialization
CLIENT_FILE = 'client_secret.json'

def get_gmail_service(email_identifier: str):
    try:
        service = init_gmail_service(CLIENT_FILE, prefix=f'_{email_identifier}')
        if not service:
            raise ValueError(f"Failed to initialize Gmail service for {email_identifier}")
        return service
    except Exception as e:
        logger.error(f"Error initializing Gmail service: {str(e)}")
        raise

# Resources
@mcp.resource("gmail://inbox/{email_identifier}")
async def get_inbox(email_identifier: str) -> Dict[str, Any]:
    """Get latest emails from inbox"""
    try:
        logger.info(f"Fetching inbox for {email_identifier}")
        service = get_gmail_service(email_identifier)
        messages, next_page = get_email_messages(service, max_results=10)
        emails = []
        for msg in messages:
            details = get_email_message_details(service, msg['id'])
            if details:
                emails.append(details)
        return {
            "success": True,
            "emails": emails,
            "has_more": bool(next_page)
        }
    except Exception as e:
        logger.error(f"Error fetching inbox: {str(e)}")
        return {"success": False, "message": str(e)}

@mcp.resource("gmail://email/{email_identifier}/{msg_id}")
async def get_email_details(email_identifier: str, msg_id: str) -> Dict[str, Any]:
    """Get detailed information about a specific email"""
    try:
        logger.info(f"Fetching email details for ID {msg_id}")
        service = get_gmail_service(email_identifier)
        details = get_email_message_details(service, msg_id)
        if details:
            return {"success": True, "email": details}
        return {"success": False, "message": "Email not found"}
    except Exception as e:
        logger.error(f"Error fetching email details: {str(e)}")
        return {"success": False, "message": str(e)}

@mcp.resource("gmail://attachments/{email_identifier}/{msg_id}")
async def list_attachments(email_identifier: str, msg_id: str) -> Dict[str, Any]:
    """List attachments for a specific email"""
    try:
        logger.info(f"Listing attachments for email {msg_id}")
        service = get_gmail_service(email_identifier)
        details = get_email_message_details(service, msg_id)
        if details and details.get('has_attachments'):
            return {
                "success": True,
                "has_attachments": True,
                "message_id": msg_id
            }
        return {
            "success": True,
            "has_attachments": False
        }
    except Exception as e:
        logger.error(f"Error listing attachments: {str(e)}")
        return {"success": False, "message": str(e)}

# Tools
@mcp.tool()
async def send_gmail(
    email_identifier: str, 
    to: str, 
    subject: str, 
    body: str, 
    attachment_paths: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Send an email with optional attachments"""
    try:
        logger.info(f"Sending email to {to} from {email_identifier}")
        service = get_gmail_service(email_identifier)
        
        # Validate attachment paths
        if attachment_paths:
            for path in attachment_paths:
                if not os.path.exists(path):
                    return {
                        "success": False,
                        "message": f"Attachment not found: {path}"
                    }
        
        response = send_email(
            service=service,
            to=to,
            subject=subject,
            body=body,
            body_type='plain',
            attachment_paths=attachment_paths
        )
        
        if response:
            return {
                "success": True,
                "message": f"Email sent successfully to {to}",
                "message_id": response.get('id', 'unknown')
            }
        return {
            "success": False,
            "message": "Failed to send email"
        }
    except Exception as e:
        logger.error(f"Error sending email: {str(e)}")
        return {"success": False, "message": str(e)}

@mcp.tool()
async def search_email_tool(
    email_identifier: str,
    query: str = '',
    max_results: int = 30,
    include_conversations: bool = True
) -> Dict[str, Any]:
    """Search emails with optional conversation inclusion"""
    try:
        logger.info(f"Searching emails for {email_identifier} with query: {query}")
        service = get_gmail_service(email_identifier)
        
        emails = []
        # Search regular emails
        messages = search_emails(service, query, max_results=max_results)
        for msg in messages:
            details = get_email_message_details(service, msg['id'])
            if details:
                emails.append(details)
                
        # Search conversations if requested
        if include_conversations:
            conversations = search_email_conversations(service, query, max_results=max_results)
            for conv in conversations:
                details = get_email_message_details(service, conv['id'])
                if details:
                    emails.append(details)
                    
        return {
            "success": True,
            "message": f"Found {len(emails)} emails",
            "emails": emails
        }
    except Exception as e:
        logger.error(f"Error searching emails: {str(e)}")
        return {"success": False, "message": str(e), "emails": []}

@mcp.tool()
async def read_latest_emails(
    email_identifier: str,
    max_results: int = 5,
    download_attachments: bool = False
) -> Dict[str, Any]:
    """Read latest emails with optional attachment download"""
    try:
        logger.info(f"Reading latest {max_results} emails for {email_identifier}")
        service = get_gmail_service(email_identifier)
        
        messages, _ = get_email_messages(service, max_results=max_results)
        emails = []
        
        attachment_dir = Path('./downloaded_attachments')
        if download_attachments:
            attachment_dir.mkdir(exist_ok=True)
            
        for msg in messages:
            details = get_email_message_details(service, msg['id'])
            if details:
                if download_attachments and details.get('has_attachments'):
                    download_attachments_parent(
                        service, 
                        user_id='me',
                        msg_id=msg['id'],
                        target_dir=str(attachment_dir)
                    )
                    details['attachments_downloaded'] = True
                    details['attachment_dir'] = str(attachment_dir)
                emails.append(details)
        
        return {
            "success": True,
            "message": f"Retrieved {len(emails)} latest emails",
            "emails": emails,
            "attachment_downloads": download_attachments
        }
    except Exception as e:
        logger.error(f"Error reading latest emails: {str(e)}")
        return {"success": False, "message": str(e), "emails": []}

@mcp.tool()
async def download_email_attachments(
    email_identifier: str,
    msg_id: str,
    download_all_in_thread: bool = False
) -> Dict[str, Any]:
    """Download attachments for a specific email or its entire thread"""
    try:
        logger.info(f"Downloading attachments for email {msg_id}")
        service = get_gmail_service(email_identifier)
        
        attachment_dir = Path('./downloaded_attachments')
        attachment_dir.mkdir(exist_ok=True)
        
        if download_all_in_thread:
            download_attachments_all(
                service,
                user_id='me',
                msg_id=msg_id,
                target_dir=str(attachment_dir)
            )
        else:
            download_attachments_parent(
                service,
                user_id='me',
                msg_id=msg_id,
                target_dir=str(attachment_dir)
            )
            
        return {
            "success": True,
            "message": "Attachments downloaded successfully",
            "directory": str(attachment_dir),
            "thread_downloaded": download_all_in_thread
        }
    except Exception as e:
        logger.error(f"Error downloading attachments: {str(e)}")
        return {"success": False, "message": str(e)}

# Prompts
@mcp.prompt()
def compose_email_prompt() -> Dict[str, Any]:
    """Guide for composing and sending an email"""
    return {
        "description": "Guide for composing and sending an email",
        "messages": [
            {
                "role": "system",
                "content": """You're helping the user compose and send an email. Make sure to collect:
1. Email identifier (the account sending the email)
2. Recipient's email address
3. Subject line
4. Email body content
5. Any attachments (optional) - provide full file paths"""
            },
            {
                "role": "user",
                "content": "I need to send an email."
            },
            {
                "role": "assistant",
                "content": """I'll help you compose and send an email. Please provide:

1. Which email account should send this? (email identifier)
2. Who are you sending it to? (recipient's email)
3. What's the subject of your email?
4. What would you like to say in the email?
5. Do you need to attach any files? If yes, please provide the file paths."""
            }
        ]
    }

@mcp.prompt()
def search_email_prompt() -> Dict[str, Any]:
    """Guide for searching emails with various criteria"""
    return {
        "description": "Guide for searching emails",
        "messages": [
            {
                "role": "system",
                "content": """You're helping the user search through their emails. Collect:
1. Email identifier (which account to search)
2. Search criteria (from, to, subject, date range, etc.)
3. Maximum number of results needed
4. Whether to include conversation threads"""
            },
            {
                "role": "user",
                "content": "I want to search my emails."
            },
            {
                "role": "assistant",
                "content": """I'll help you search your emails. Please specify:

1. Which email account do you want to search? (email identifier)
2. What are you looking for? You can search by:
   - Sender (from:[email protected])
   - Subject (subject:meeting)
   - Date range (after:2024/01/01 before:2024/02/01)
   - Has attachment (has:attachment)
   Or combine these criteria.
3. How many results would you like to see? (default is 30)
4. Should I include conversation threads in the search? (yes/no)"""
            }
        ]
    }

@mcp.prompt()
def read_latest_emails_prompt() -> Dict[str, Any]:
    """Guide for reading recent emails with optional attachment handling"""
    return {
        "description": "Guide for reading latest emails",
        "messages": [
            {
                "role": "system",
                "content": """You're helping the user read their recent emails. Collect:
1. Email identifier (which account to read)
2. Number of emails to retrieve
3. Whether to automatically download attachments"""
            },
            {
                "role": "user",
                "content": "I want to check my recent emails."
            },
            {
                "role": "assistant",
                "content": """I'll help you check your recent emails. Please specify:

1. Which email account do you want to check? (email identifier)
2. How many recent emails would you like to see? (default is 5)
3. Should I automatically download any attachments found? (yes/no)
   Note: Attachments will be saved to a 'downloaded_attachments' folder."""
            }
        ]
    }

@mcp.prompt()
def download_attachments_prompt() -> Dict[str, Any]:
    """Guide for downloading email attachments"""
    return {
        "description": "Guide for downloading email attachments",
        "messages": [
            {
                "role": "system",
                "content": """You're helping the user download email attachments. Collect:
1. Email identifier (which account to use)
2. Message ID of the email
3. Whether to download attachments from the entire conversation thread"""
            },
            {
                "role": "user",
                "content": "I want to download attachments from an email."
            },
            {
                "role": "assistant",
                "content": """I'll help you download email attachments. Please provide:

1. Which email account has the attachments? (email identifier)
2. What's the Message ID of the email? (You can get this from search results)
3. Do you want to download attachments from the entire conversation thread? (yes/no)
   Note: Files will be saved to a 'downloaded_attachments' folder."""
            }
        ]
    }

if __name__ == "__main__":
    try:
        logger.info("Starting Gmail MCP server...")
        mcp.run()
    except KeyboardInterrupt:
        logger.info("Server shutting down gracefully...")
    except Exception as e:
        logger.error(f"Fatal server error: {str(e)}")

```