#
tokens: 11138/50000 8/8 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── Dockerfile
├── LICENSE
├── package-lock.json
├── package.json
├── pyproject.toml
├── README.md
├── server.js
├── src
│   └── mcp_server_headless_gmail
│       ├── __init__.py
│       └── server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Environment variables
.env

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual Environment
venv/
env/
ENV/

# IDE
.idea/
.vscode/
*.swp
*.swo

# OS
.DS_Store
Thumbs.db 

# Node
node_modules/

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MCP Headless Gmail Server (NPM & Docker)

[![npm version](https://img.shields.io/npm/v/@peakmojo/mcp-server-headless-gmail.svg)](https://www.npmjs.com/package/@peakmojo/mcp-server-headless-gmail) [![Docker Pulls](https://img.shields.io/docker/pulls/buryhuang/mcp-headless-gmail)](https://hub.docker.com/r/buryhuang/mcp-headless-gmail) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A MCP (Model Context Protocol) server that provides get, send Gmails without local credential or token setup.

<a href="https://glama.ai/mcp/servers/@baryhuang/mcp-headless-gmail">
  <img width="380" height="200" src="https://glama.ai/mcp/servers/@baryhuang/mcp-headless-gmail/badge" alt="Headless Gmail Server MCP server" />
</a>

## Why MCP Headless Gmail Server?
### Critical Advantages
- **Headless & Remote Operation**: Unlike other MCP Gmail solutions that require running outside of docker and local file access, this server can run completely headless in remote environments with no browser no local file access.
- **Decoupled Architecture**: Any client can complete the OAuth flow independently, then pass credentials as context to this MCP server, creating a complete separation between credential storage and server implementation.

### Nice but not critical
- **Focused Functionality**: In many use cases, especially for marketing applications, only Gmail access is needed without additional Google services like Calendar, making this focused implementation ideal.
- **Docker-Ready**: Designed with containerization in mind for a well-isolated, environment-independent, one-click setup.
- **Reliable Dependencies**: Built on the well-maintained google-api-python-client library.

## Features

- Get most recent emails from Gmail with the first 1k characters of the body
- Get full email body content in 1k chunks using offset parameter
- Send emails through Gmail
- Refresh access tokens separately
- Automatic refresh token handling

## Prerequisites

- Python 3.10 or higher
- Google API credentials (client ID, client secret, access token, and refresh token)

## Installation

```bash
# Clone the repository
git clone https://github.com/baryhuang/mcp-headless-gmail.git
cd mcp-headless-gmail

# Install dependencies
pip install -e .
```

## Docker

### Building the Docker Image

```bash
# Build the Docker image
docker build -t mcp-headless-gmail .
```

## Usage with Claude Desktop

You can configure Claude Desktop to use the Docker image by adding the following to your Claude configuration:

docker
```json
{
  "mcpServers": {
    "gmail": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "buryhuang/mcp-headless-gmail:latest"
      ]
    }
  }
}
```

npm version
```json
{
  "mcpServers": {
    "gmail": {
      "command": "npx",
      "args": [
        "@peakmojo/mcp-server-headless-gmail"
      ]
    }
  }
}
```

Note: With this configuration, you'll need to provide your Google API credentials in the tool calls as shown in the [Using the Tools](#using-the-tools) section. Gmail credentials are not passed as environment variables to maintain separation between credential storage and server implementation.

## Cross-Platform Publishing

To publish the Docker image for multiple platforms, you can use the `docker buildx` command. Follow these steps:

1. **Create a new builder instance** (if you haven't already):
   ```bash
   docker buildx create --use
   ```

2. **Build and push the image for multiple platforms**:
   ```bash
   docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t buryhuang/mcp-headless-gmail:latest --push .
   ```

3. **Verify the image is available for the specified platforms**:
   ```bash
   docker buildx imagetools inspect buryhuang/mcp-headless-gmail:latest
   ```

## Usage

The server provides Gmail functionality through MCP tools. Authentication handling is simplified with a dedicated token refresh tool.

### Starting the Server

```bash
mcp-server-headless-gmail
```

### Using the Tools

When using an MCP client like Claude, you have two main ways to handle authentication:

#### Refreshing Tokens (First Step or When Tokens Expire)

If you have both access and refresh tokens:
```json
{
  "google_access_token": "your_access_token",
  "google_refresh_token": "your_refresh_token",
  "google_client_id": "your_client_id",
  "google_client_secret": "your_client_secret"
}
```

If your access token has expired, you can refresh with just the refresh token:
```json
{
  "google_refresh_token": "your_refresh_token",
  "google_client_id": "your_client_id",
  "google_client_secret": "your_client_secret"
}
```

This will return a new access token and its expiration time, which you can use for subsequent calls.

#### Getting Recent Emails

Retrieves recent emails with the first 1k characters of each email body:

```json
{
  "google_access_token": "your_access_token",
  "max_results": 5,
  "unread_only": false
}
```

Response includes:
- Email metadata (id, threadId, from, to, subject, date, etc.)
- First 1000 characters of the email body
- `body_size_bytes`: Total size of the email body in bytes
- `contains_full_body`: Boolean indicating if the entire body is included (true) or truncated (false)

#### Getting Full Email Body Content

For emails with bodies larger than 1k characters, you can retrieve the full content in chunks:

```json
{
  "google_access_token": "your_access_token",
  "message_id": "message_id_from_get_recent_emails",
  "offset": 0
}
```

You can also get email content by thread ID:

```json
{
  "google_access_token": "your_access_token",
  "thread_id": "thread_id_from_get_recent_emails",
  "offset": 1000
}
```

The response includes:
- A 1k chunk of the email body starting from the specified offset
- `body_size_bytes`: Total size of the email body
- `chunk_size`: Size of the returned chunk
- `contains_full_body`: Boolean indicating if the chunk contains the remainder of the body

To retrieve the entire email body of a long message, make sequential calls increasing the offset by 1000 each time until `contains_full_body` is true.

#### Sending an Email

```json
{
  "google_access_token": "your_access_token",
  "to": "[email protected]",
  "subject": "Hello from MCP Gmail",
  "body": "This is a test email sent via MCP Gmail server",
  "html_body": "<p>This is a <strong>test email</strong> sent via MCP Gmail server</p>"
}
```

### Token Refresh Workflow

1. Start by calling the `gmail_refresh_token` tool with either:
   - Your full credentials (access token, refresh token, client ID, and client secret), or
   - Just your refresh token, client ID, and client secret if the access token has expired
2. Use the returned new access token for subsequent API calls.
3. If you get a response indicating token expiration, call the `gmail_refresh_token` tool again to get a new token.

This approach simplifies most API calls by not requiring client credentials for every operation, while still enabling token refresh when needed.

## Obtaining Google API Credentials

To obtain the required Google API credentials, follow these steps:

1. Go to the [Google Cloud Console](https://console.cloud.google.com/)
2. Create a new project
3. Enable the Gmail API
4. Configure OAuth consent screen
5. Create OAuth client ID credentials (select "Desktop app" as the application type)
6. Save the client ID and client secret
7. Use OAuth 2.0 to obtain access and refresh tokens with the following scopes:
   - `https://www.googleapis.com/auth/gmail.readonly` (for reading emails)
   - `https://www.googleapis.com/auth/gmail.send` (for sending emails)

## Token Refreshing

This server implements automatic token refreshing. When your access token expires, the Google API client will use the refresh token, client ID, and client secret to obtain a new access token without requiring user intervention.

## Security Note

This server requires direct access to your Google API credentials. Always keep your tokens and credentials secure and never share them with untrusted parties.

## License

See the LICENSE file for details.

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Use Python base image
FROM python:3.11-slim

# Install the project into `/app`
WORKDIR /app

# Copy the entire project
COPY . /app

# Install the package
RUN pip install --no-cache-dir -e .

# Run the server
ENTRYPOINT ["mcp-server-headless-gmail"] 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-headless-gmail"
version = "0.1.0"
description = "A simple Gmail MCP server"
readme = "README.md"
requires-python = ">=3.10"
dependencies = ["mcp>=1.4.1", "python-dotenv>=1.0.1", "google-api-python-client>=2.127.0", "google-auth>=2.34.0", "google-auth-oauthlib>=1.2.0", "google-auth-httplib2>=0.2.0", "python-dateutil>=2.8.2"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv]
dev-dependencies = ["pyright>=1.1.389"]

[project.scripts]
mcp-server-headless-gmail = "mcp_server_headless_gmail:main" 
```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "@peakmojo/mcp-server-headless-gmail",
  "version": "0.1.0",
  "description": "A simple Gmail MCP server (no authentication flow)",
  "type": "module",
  "publishConfig": {
    "access": "public"
  },
  "main": "server.js",
  "bin": {
    "mcp-server-headless-gmail": "server.js"
  },
  "scripts": {
    "start": "node server.js"
  },
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.4.1",
    "googleapis": "^133.0.0",
    "luxon": "^3.4.4",
    "zod": "^3.22.4",
    "node-fetch": "^3.3.2"
  },
  "engines": {
    "node": ">=18.0.0"
  },
  "license": "MIT"
} 
```

--------------------------------------------------------------------------------
/src/mcp_server_headless_gmail/__init__.py:
--------------------------------------------------------------------------------

```python
import argparse
import asyncio
import logging
from . import server

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('mcp_headless_gmail')

def main():
    logger.debug("Starting mcp-server-headless-gmail main()")
    parser = argparse.ArgumentParser(description='Headless Gmail MCP Server')
    args = parser.parse_args()
    
    # Run the async main function
    logger.debug("About to run server.main()")
    asyncio.run(server.main())
    logger.debug("Server main() completed")

if __name__ == "__main__":
    main()

# Expose important items at package level
__all__ = ["main", "server"] 
```

--------------------------------------------------------------------------------
/server.js:
--------------------------------------------------------------------------------

```javascript
#!/usr/bin/env node

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';
import { google } from 'googleapis';
import { Buffer } from 'buffer';
import { DateTime } from 'luxon';

const logLevels = {
  ERROR: 0,
  WARN: 1,
  INFO: 2,
  DEBUG: 3
};

class Logger {
  constructor(name) {
    this.name = name;
    this.logLevel = process.env.LOG_LEVEL ? 
      logLevels[(process.env.LOG_LEVEL || 'INFO').toUpperCase()] : 
      logLevels.INFO;
  }

  log(level, message) {
    if (logLevels[level] <= this.logLevel) {
      const timestamp = new Date().toISOString();
      console.error(`${timestamp} - ${this.name} - ${level} - ${message}`);
    }
  }

  info(message) { this.log('INFO', message); }
  warn(message) { this.log('WARN', message); }
  error(message) { this.log('ERROR', message); }
  debug(message) { this.log('DEBUG', message); }
}

const logger = new Logger('gmail-mcp');

class GmailClient {
  constructor({ accessToken, refreshToken, clientId, clientSecret } = {}) {
    if (!accessToken && !refreshToken) {
      throw new Error('Either accessToken or refreshToken must be provided');
    }
    this.accessToken = accessToken;
    this.refreshToken = refreshToken;
    this.clientId = clientId;
    this.clientSecret = clientSecret;
    this.tokenUri = 'https://oauth2.googleapis.com/token';

    // Always create the OAuth2 client
    this.oauth2Client = new google.auth.OAuth2(clientId, clientSecret);

    // Set credentials if available
    this.oauth2Client.setCredentials({
      access_token: accessToken,
      refresh_token: refreshToken,
      client_id: clientId,
      client_secret: clientSecret
    });

    // Create the Gmail API client with the OAuth2 client
    this.gmail = google.gmail({
      version: 'v1',
      auth: this.oauth2Client
    });
  }

  async _handleTokenRefresh(operation) {
    try {
      return await operation();
    } catch (error) {
      logger.error(`Request error: ${error.message}`);
      if (error.response) {
        const statusCode = error.response.status;
        if (statusCode === 401) {
          return JSON.stringify({
            error: 'Unauthorized. Token might be expired. Try refreshing your token.',
            details: error.message
          });
        } else {
          return JSON.stringify({
            error: `Gmail API error: ${statusCode}`,
            details: error.message
          });
        }
      }
      return JSON.stringify({
        error: 'Request to Gmail API failed',
        details: error.message
      });
    }
  }

  async refreshAccessToken(clientId, clientSecret) {
    logger.debug(`Starting refreshAccessToken with clientId=${clientId?.slice(0, 5)}...`);
    if (!this.refreshToken) {
      return JSON.stringify({
        error: 'No refresh token provided',
        status: 'error'
      });
    }
    try {
      const oauth2Client = new google.auth.OAuth2(clientId, clientSecret);
      oauth2Client.setCredentials({ refresh_token: this.refreshToken });
      const { credentials } = await oauth2Client.refreshAccessToken();
      this.accessToken = credentials.access_token;
      const expiresIn = credentials.expiry_date ? Math.floor((credentials.expiry_date - Date.now()) / 1000) : 3600;
      const expiry = credentials.expiry_date ? new Date(credentials.expiry_date).toISOString() : null;
      return JSON.stringify({
        access_token: this.accessToken,
        expires_at: expiry,
        expires_in: expiresIn,
        status: 'success'
      });
    } catch (error) {
      logger.error(`Exception in refreshAccessToken: ${error.message}`);
      return JSON.stringify({
        error: error.message,
        status: 'error'
      });
    }
  }

  async getRecentEmails({ maxResults = 10, unreadOnly = false } = {}) {
    const operation = async () => {
      if (!this.gmail) {
        throw new Error('Gmail service not initialized. No valid access token provided.');
      }
      const query = unreadOnly ? 'is:unread' : '';
      const res = await this.gmail.users.messages.list({
        userId: 'me',
        maxResults,
        labelIds: ['INBOX'],
        q: query
      });
      const messages = res.data.messages || [];
      const emails = [];
      for (const message of messages) {
        const msg = await this.gmail.users.messages.get({
          userId: 'me',
          id: message.id,
          format: 'full'
        });
        const payload = msg.data.payload || {};
        const headers = (payload.headers || []).reduce((acc, h) => {
          const name = h.name.toLowerCase();
          if (['from', 'to', 'subject', 'date'].includes(name)) {
            acc[name] = h.value;
          }
          return acc;
        }, {});
        const { body, body_size_bytes, contains_full_body } = this.extractPlainTextBody(payload);
        emails.push({
          id: msg.data.id,
          threadId: msg.data.threadId,
          labelIds: msg.data.labelIds,
          snippet: msg.data.snippet,
          from: headers.from || '',
          to: headers.to || '',
          subject: headers.subject || '',
          date: headers.date || '',
          internalDate: msg.data.internalDate,
          body,
          body_size_bytes,
          contains_full_body
        });
      }
      return JSON.stringify({ emails });
    };
    return await this._handleTokenRefresh(operation);
  }

  extractPlainTextBody(payload) {
    let body = '';
    let body_size_bytes = 0;
    let contains_full_body = true;
    function extract(parts) {
      if (!parts) return;
      for (const part of parts) {
        if (part.mimeType === 'text/plain' && part.body && part.body.data) {
          const decoded = Buffer.from(part.body.data, 'base64').toString('utf-8');
          body += decoded;
          body_size_bytes += Buffer.byteLength(decoded);
        }
        if (part.parts) extract(part.parts);
      }
    }
    if (payload.body && payload.body.data) {
      const decoded = Buffer.from(payload.body.data, 'base64').toString('utf-8');
      body = decoded;
      body_size_bytes = Buffer.byteLength(decoded);
    }
    if (payload.parts) extract(payload.parts);
    if (body.length > 1000) {
      body = body.slice(0, 1000);
      contains_full_body = false;
    }
    return { body, body_size_bytes, contains_full_body };
  }

  async sendEmail({ to, subject, body, html_body }) {
    const operation = async () => {
      if (!this.gmail) {
        throw new Error('Gmail service not initialized. No valid access token provided.');
      }
      const messageParts = [
        `To: ${to}`,
        `Subject: ${subject}`,
        'Content-Type: multipart/alternative; boundary="boundary"',
        '',
        '--boundary',
        'Content-Type: text/plain; charset="UTF-8"',
        '',
        body,
        '--boundary',
        'Content-Type: text/html; charset="UTF-8"',
        '',
        html_body || '',
        '--boundary--'
      ];
      const rawMessage = Buffer.from(messageParts.join('\r\n')).toString('base64').replace(/\+/g, '-').replace(/\//g, '_');
      const res = await this.gmail.users.messages.send({
        userId: 'me',
        requestBody: { raw: rawMessage }
      });
      return JSON.stringify({
        messageId: res.data.id,
        threadId: res.data.threadId,
        labelIds: res.data.labelIds
      });
    };
    return await this._handleTokenRefresh(operation);
  }

  async getEmailBodyChunk({ message_id, thread_id, offset = 0 }) {
    const operation = async () => {
      if (!this.gmail) {
        throw new Error('Gmail service not initialized. No valid access token provided.');
      }
      let local_message_id = message_id;
      if (!local_message_id && thread_id) {
        const thread = await this.gmail.users.threads.get({ userId: 'me', id: thread_id });
        if (!thread.data.messages || !thread.data.messages.length) {
          return JSON.stringify({
            error: `No messages found in thread ${thread_id}`,
            status: 'error'
          });
        }
        local_message_id = thread.data.messages[0].id;
      }
      if (!local_message_id) {
        return JSON.stringify({
          error: 'Either message_id or thread_id must be provided',
          status: 'error'
        });
      }
      const msg = await this.gmail.users.messages.get({
        userId: 'me',
        id: local_message_id,
        format: 'full'
      });
      const payload = msg.data.payload || {};
      const { body, body_size_bytes } = this.extractPlainTextBody(payload);
      const chunk = offset >= body.length ? '' : body.slice(offset, offset + 1000);
      const contains_full_body = (offset + chunk.length >= body.length);
      return JSON.stringify({
        message_id: local_message_id,
        thread_id: msg.data.threadId,
        body: chunk,
        body_size_bytes,
        offset,
        chunk_size: chunk.length,
        contains_full_body,
        status: 'success'
      });
    };
    return await this._handleTokenRefresh(operation);
  }
}

async function main() {
  logger.info('Starting Gmail MCP server');
  try {
    const server = new McpServer({
      name: 'gmail-client',
      version: '0.1.0'
    });

    server.tool(
      'gmail_refresh_token',
      'Refresh the access token using the refresh token and client credentials',
      {
        google_access_token: z.string().optional().describe('Google OAuth2 access token (optional if expired)'),
        google_refresh_token: z.string().describe('Google OAuth2 refresh token'),
        google_client_id: z.string().describe('Google OAuth2 client ID for token refresh'),
        google_client_secret: z.string().describe('Google OAuth2 client secret for token refresh')
      },
      async ({ google_access_token, google_refresh_token, google_client_id, google_client_secret }) => {
        try {
          const gmail = new GmailClient({
            accessToken: google_access_token,
            refreshToken: google_refresh_token,
            clientId: google_client_id,
            clientSecret: google_client_secret
          });
          const result = await gmail.refreshAccessToken(google_client_id, google_client_secret);
          return { content: [{ type: 'text', text: result }] };
        } catch (error) {
          return { content: [{ type: 'text', text: JSON.stringify({ error: error.message, status: 'error' }) }] };
        }
      }
    );

    server.tool(
      'gmail_get_recent_emails',
      'Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)',
      {
        google_access_token: z.string().describe('Google OAuth2 access token'),
        max_results: z.number().optional().describe('Maximum number of emails to return (default: 10)'),
        unread_only: z.boolean().optional().describe('Whether to return only unread emails (default: False)')
      },
      async ({ google_access_token, max_results = 10, unread_only = false }) => {
        try {
          const gmail = new GmailClient({
            accessToken: google_access_token
          });
          const result = await gmail.getRecentEmails({ maxResults: max_results, unreadOnly: unread_only });
          return { content: [{ type: 'text', text: result }] };
        } catch (error) {
          return { content: [{ type: 'text', text: `Error: ${error.message}` }] };
        }
      }
    );

    server.tool(
      'gmail_get_email_body_chunk',
      'Get a 1k character chunk of an email body starting from the specified offset',
      {
        google_access_token: z.string().describe('Google OAuth2 access token'),
        message_id: z.string().optional().describe('ID of the message to retrieve'),
        thread_id: z.string().optional().describe('ID of the thread to retrieve (will get the first message if multiple exist)'),
        offset: z.number().optional().describe('Offset in characters to start from (default: 0)')
      },
      async ({ google_access_token, message_id, thread_id, offset = 0 }) => {
        try {
          const gmail = new GmailClient({
            accessToken: google_access_token
          });
          const result = await gmail.getEmailBodyChunk({ message_id, thread_id, offset });
          return { content: [{ type: 'text', text: result }] };
        } catch (error) {
          return { content: [{ type: 'text', text: `Error: ${error.message}` }] };
        }
      }
    );

    server.tool(
      'gmail_send_email',
      'Send an email via Gmail',
      {
        google_access_token: z.string().describe('Google OAuth2 access token'),
        to: z.string().describe('Recipient email address'),
        subject: z.string().describe('Email subject'),
        body: z.string().describe('Email body content (plain text)'),
        html_body: z.string().optional().describe('Email body content in HTML format (optional)')
      },
      async ({ google_access_token, to, subject, body, html_body }) => {
        try {
          const gmail = new GmailClient({
            accessToken: google_access_token
          });
          const result = await gmail.sendEmail({ to, subject, body, html_body });
          return { content: [{ type: 'text', text: result }] };
        } catch (error) {
          return { content: [{ type: 'text', text: `Error: ${error.message}` }] };
        }
      }
    );

    const transport = new StdioServerTransport();
    await server.connect(transport);
    logger.info('MCP server started and ready to receive requests');
  } catch (error) {
    logger.error(`Error starting server: ${error.message}`);
    process.exit(1);
  }
}

main(); 
```

--------------------------------------------------------------------------------
/src/mcp_server_headless_gmail/server.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Any, Dict, List, Optional
import os
from dotenv import load_dotenv
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
import json
from datetime import datetime, timedelta
from dateutil.tz import tzlocal
import argparse
import base64
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import google.oauth2.credentials
import google.auth.exceptions
import email
import re
from google.auth.transport.requests import Request

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('mcp_server_headless_gmail')
logger.setLevel(logging.DEBUG)

def convert_datetime_fields(obj: Any) -> Any:
    """Convert any datetime or tzlocal objects to string in the given object"""
    if isinstance(obj, dict):
        return {k: convert_datetime_fields(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_datetime_fields(item) for item in obj]
    elif isinstance(obj, datetime):
        return obj.isoformat()
    elif isinstance(obj, tzlocal):
        # Get the current timezone offset
        offset = datetime.now(tzlocal()).strftime('%z')
        return f"UTC{offset[:3]}:{offset[3:]}"  # Format like "UTC+08:00" or "UTC-05:00"
    return obj

class GmailClient:
    def __init__(self, access_token: Optional[str] = None, refresh_token: Optional[str] = None, 
                 client_id: Optional[str] = None, client_secret: Optional[str] = None):
        if not access_token and not refresh_token:
            raise ValueError("Either access_token or refresh_token must be provided")
        
        # Create credentials from the provided tokens
        self.credentials = google.oauth2.credentials.Credentials(
            token=access_token,
            refresh_token=refresh_token,
            token_uri="https://oauth2.googleapis.com/token",
            client_id=client_id,
            client_secret=client_secret,
        )
        
        # Build the Gmail service if access token is provided
        if access_token:
            self.service = build('gmail', 'v1', credentials=self.credentials, cache_discovery=False)

    def _handle_token_refresh(self, func):
        """Decorator to handle token refresh errors gracefully"""
        try:
            return func()
        except google.auth.exceptions.RefreshError as e:
            logger.error(f"Token refresh error: {str(e)}")
            return json.dumps({
                "error": "Token refresh failed. Please provide new access and refresh tokens.",
                "details": str(e)
            })

    def refresh_token(self, client_id: str, client_secret: str) -> str:
        """Refresh the access token using the refresh token
        
        Args:
            client_id: Google OAuth2 client ID
            client_secret: Google OAuth2 client secret
        """
        if not self.credentials.refresh_token:
            return json.dumps({
                "error": "No refresh token provided",
                "status": "error"
            })
            
        try:
            # Set client_id and client_secret for refresh
            self.credentials._client_id = client_id
            self.credentials._client_secret = client_secret
            
            # Force refresh
            request = Request()
            self.credentials.refresh(request)
            
            # Get token expiration time
            expiry = self.credentials.expiry
            
            # Return the new access token and its expiration
            return json.dumps({
                "access_token": self.credentials.token,
                "expires_at": expiry.isoformat() if expiry else None,
                "expires_in": int((expiry - datetime.now(expiry.tzinfo)).total_seconds()) if expiry else None,
                "status": "success"
            })
            
        except google.auth.exceptions.RefreshError as e:
            logger.error(f"Token refresh error: {str(e)}")
            return json.dumps({
                "error": "Token refresh failed. Please provide valid client ID and client secret.",
                "details": str(e),
                "status": "error"
            })
        except Exception as e:
            logger.error(f"Exception: {str(e)}")
            return json.dumps({
                "error": str(e),
                "status": "error"
            })

    def extract_plain_text_body(self, msg_payload):
        """Extract plain text body from message payload
        
        Args:
            msg_payload: Gmail API message payload
            
        Returns:
            tuple: (plain_text_body, body_size_in_bytes)
        """
        body_text = ""
        body_size = 0
        
        # Helper function to process message parts recursively
        def extract_from_parts(parts):
            nonlocal body_text, body_size
            
            if not parts:
                return
                
            for part in parts:
                mime_type = part.get('mimeType', '')
                
                # If this part is plain text
                if mime_type == 'text/plain':
                    body_data = part.get('body', {}).get('data', '')
                    if body_data:
                        # Decode base64url encoded data
                        decoded_bytes = base64.urlsafe_b64decode(body_data)
                        body_size += len(decoded_bytes)
                        body_part = decoded_bytes.decode('utf-8', errors='replace')
                        body_text += body_part
                
                # If this part has child parts, process them
                if 'parts' in part:
                    extract_from_parts(part['parts'])
        
        # If body data is directly in the payload
        if 'body' in msg_payload and 'data' in msg_payload['body']:
            body_data = msg_payload['body']['data']
            if body_data:
                decoded_bytes = base64.urlsafe_b64decode(body_data)
                body_size += len(decoded_bytes)
                body_text = decoded_bytes.decode('utf-8', errors='replace')
        
        # If message has parts, process them
        if 'parts' in msg_payload:
            extract_from_parts(msg_payload['parts'])
            
        return body_text, body_size

    def get_recent_emails(self, max_results: int = 10, unread_only: bool = False) -> str:
        """Get the most recent emails from Gmail
        
        Args:
            max_results: Maximum number of emails to return (default: 10)
            unread_only: Whether to return only unread emails (default: False)
            
        Returns:
            JSON string with an array of emails containing metadata, snippets, and first 1k chars of body
        """
        try:
            # Check if service is initialized
            if not hasattr(self, 'service'):
                logger.error("Gmail service not initialized. No valid access token provided.")
                return json.dumps({
                    "error": "No valid access token provided. Please refresh your token first.",
                    "status": "error"
                })
                
            # Define the operation
            def _operation():
                logger.debug(f"Fetching up to {max_results} recent emails from Gmail")
                
                # Get list of recent messages
                query = 'is:unread' if unread_only else ''
                logger.debug(f"Calling Gmail API to list messages from INBOX with query: '{query}'")
                
                try:
                    response = self.service.users().messages().list(
                        userId='me',
                        maxResults=max_results,
                        labelIds=['INBOX'],
                        q=query
                    ).execute()
                    
                    logger.debug(f"API Response received: {json.dumps(response)[:200]}...")
                except Exception as e:
                    logger.error(f"Error calling Gmail API list: {str(e)}", exc_info=True)
                    return json.dumps({"error": f"Gmail API list error: {str(e)}"})
                
                messages = response.get('messages', [])
                
                if not messages:
                    logger.debug("No messages found in the response")
                    return json.dumps({"emails": []})
                
                logger.debug(f"Found {len(messages)} messages, processing details")
                
                # Fetch detailed information for each message
                emails = []
                for i, message in enumerate(messages):
                    logger.debug(f"Fetching details for message {i+1}/{len(messages)}, ID: {message['id']}")
                    msg = self.service.users().messages().get(
                        userId='me',
                        id=message['id'],
                        format='full'
                    ).execute()
                    
                    logger.debug(f"Message {message['id']} details received, extracting fields")
                    
                    # Extract headers
                    headers = {}
                    if 'payload' in msg and 'headers' in msg['payload']:
                        for header in msg['payload']['headers']:
                            name = header.get('name', '').lower()
                            if name in ['from', 'to', 'subject', 'date']:
                                headers[name] = header.get('value', '')
                    else:
                        logger.debug(f"Message {message['id']} missing payload or headers fields: {json.dumps(msg)[:200]}...")
                    
                    # Extract plain text body and size
                    body_text = ""
                    body_size_bytes = 0
                    contains_full_body = True
                    
                    if 'payload' in msg:
                        body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
                        
                        # Check if we're returning the full body or truncating
                        if len(body_text) > 1000:
                            body_text = body_text[:1000]
                            contains_full_body = False
                    
                    # Format the email
                    email_data = {
                        "id": msg['id'],
                        "threadId": msg['threadId'],
                        "labelIds": msg.get('labelIds', []),
                        "snippet": msg.get('snippet', ''),
                        "from": headers.get('from', ''),
                        "to": headers.get('to', ''),
                        "subject": headers.get('subject', ''),
                        "date": headers.get('date', ''),
                        "internalDate": msg.get('internalDate', ''),
                        "body": body_text,
                        "body_size_bytes": body_size_bytes,
                        "contains_full_body": contains_full_body
                    }
                    
                    logger.debug(f"Successfully processed message {message['id']}")
                    emails.append(email_data)
                
                logger.debug(f"Successfully processed {len(emails)} emails")
                return json.dumps({"emails": convert_datetime_fields(emails)})
            
            # Execute the operation with token refresh handling
            return self._handle_token_refresh(_operation)
            
        except HttpError as e:
            logger.error(f"Gmail API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_recent_emails: {str(e)}", exc_info=True)
            return json.dumps({"error": str(e)})

    def send_email(self, to: str, subject: str, body: str, html_body: Optional[str] = None) -> str:
        """Send an email via Gmail
        
        Args:
            to: Recipient email address
            subject: Email subject
            body: Plain text email body
            html_body: Optional HTML email body
        """
        try:
            # Check if service is initialized
            if not hasattr(self, 'service'):
                return json.dumps({
                    "error": "No valid access token provided. Please refresh your token first.",
                    "status": "error"
                })
                
            # Define the operation
            def _operation():
                # Create message container
                message = MIMEMultipart('alternative')
                message['to'] = to
                message['subject'] = subject
                
                # Attach plain text and HTML parts
                message.attach(MIMEText(body, 'plain'))
                if html_body:
                    message.attach(MIMEText(html_body, 'html'))
                
                # Encode the message
                encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
                
                # Create the message body
                create_message = {
                    'raw': encoded_message
                }
                
                # Send the message
                send_response = self.service.users().messages().send(
                    userId='me', 
                    body=create_message
                ).execute()
                
                return json.dumps({
                    "messageId": send_response['id'],
                    "threadId": send_response.get('threadId', ''),
                    "labelIds": send_response.get('labelIds', [])
                })
            
            # Execute the operation with token refresh handling
            return self._handle_token_refresh(_operation)
            
        except HttpError as e:
            logger.error(f"API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception: {str(e)}")
            return json.dumps({"error": str(e)})

    def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str:
        """Get a chunk of the email body
        
        Args:
            message_id: ID of the message to retrieve
            thread_id: ID of the thread to retrieve (will get the first message if multiple exist)
            offset: Offset in characters to start from (default: 0)
            
        Returns:
            JSON string with the body chunk and metadata
        """
        try:
            # Check if service is initialized
            if not hasattr(self, 'service'):
                logger.error("Gmail service not initialized. No valid access token provided.")
                return json.dumps({
                    "error": "No valid access token provided. Please refresh your token first.",
                    "status": "error"
                })
                
            # Define the operation
            def _operation():
                logger.debug(f"Fetching email body chunk with offset {offset}")
                
                # Store message_id in local variable to make it accessible within _operation scope
                local_message_id = message_id
                local_thread_id = thread_id
                
                # Validate inputs
                if not local_message_id and not local_thread_id:
                    return json.dumps({
                        "error": "Either message_id or thread_id must be provided",
                        "status": "error"
                    })
                
                try:
                    # If thread_id is provided but not message_id, get the first message in thread
                    if local_thread_id and not local_message_id:
                        logger.debug(f"Getting messages in thread {local_thread_id}")
                        thread = self.service.users().threads().get(
                            userId='me',
                            id=local_thread_id
                        ).execute()
                        
                        if not thread or 'messages' not in thread or not thread['messages']:
                            return json.dumps({
                                "error": f"No messages found in thread {local_thread_id}",
                                "status": "error"
                            })
                            
                        # Use the first message in the thread
                        local_message_id = thread['messages'][0]['id']
                        logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}")
                    
                    # Get the message
                    logger.debug(f"Getting message {local_message_id}")
                    msg = self.service.users().messages().get(
                        userId='me',
                        id=local_message_id,
                        format='full'
                    ).execute()
                    
                    # Extract the full plain text body
                    body_text = ""
                    body_size_bytes = 0
                    
                    if 'payload' in msg:
                        body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
                    
                    # Apply offset and get chunk
                    if offset >= len(body_text):
                        chunk = ""
                    else:
                        chunk = body_text[offset:offset+1000]
                    
                    # Determine if this contains the full remaining body
                    contains_full_body = (offset + len(chunk) >= len(body_text))
                    
                    return json.dumps({
                        "message_id": local_message_id,
                        "thread_id": msg.get('threadId', ''),
                        "body": chunk,
                        "body_size_bytes": body_size_bytes,
                        "offset": offset,
                        "chunk_size": len(chunk),
                        "contains_full_body": contains_full_body,
                        "status": "success"
                    })
                    
                except Exception as e:
                    logger.error(f"Error processing message: {str(e)}", exc_info=True)
                    return json.dumps({
                        "error": f"Error processing message: {str(e)}",
                        "status": "error"
                    })
            
            # Execute the operation with token refresh handling
            return self._handle_token_refresh(_operation)
            
        except HttpError as e:
            logger.error(f"Gmail API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True)
            return json.dumps({"error": str(e)})

async def main():
    """Run the Gmail MCP server."""
    logger.info("Gmail server starting")
    server = Server("gmail-client")

    @server.list_resources()
    async def handle_list_resources() -> list[types.Resource]:
        return []

    @server.read_resource()
    async def handle_read_resource(uri: AnyUrl) -> str:
        if uri.scheme != "gmail":
            raise ValueError(f"Unsupported URI scheme: {uri.scheme}")

        path = str(uri).replace("gmail://", "")
        return ""

    @server.list_tools()
    async def handle_list_tools() -> list[types.Tool]:
        """List available tools"""
        return [
            types.Tool(
                name="gmail_refresh_token",
                description="Refresh the access token using the refresh token and client credentials",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "google_access_token": {"type": "string", "description": "Google OAuth2 access token (optional if expired)"},
                        "google_refresh_token": {"type": "string", "description": "Google OAuth2 refresh token"},
                        "google_client_id": {"type": "string", "description": "Google OAuth2 client ID for token refresh"},
                        "google_client_secret": {"type": "string", "description": "Google OAuth2 client secret for token refresh"}
                    },
                    "required": ["google_refresh_token", "google_client_id", "google_client_secret"]
                },
            ),
            types.Tool(
                name="gmail_get_recent_emails",
                description="Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
                        "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"},
                        "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"}
                    },
                    "required": ["google_access_token"]
                },
            ),
            types.Tool(
                name="gmail_get_email_body_chunk",
                description="Get a 1k character chunk of an email body starting from the specified offset",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
                        "message_id": {"type": "string", "description": "ID of the message to retrieve"},
                        "thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"},
                        "offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"}
                    },
                    "required": ["google_access_token"]
                },
            ),
            types.Tool(
                name="gmail_send_email",
                description="Send an email via Gmail",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
                        "to": {"type": "string", "description": "Recipient email address"},
                        "subject": {"type": "string", "description": "Email subject"},
                        "body": {"type": "string", "description": "Email body content (plain text)"},
                        "html_body": {"type": "string", "description": "Email body content in HTML format (optional)"}
                    },
                    "required": ["google_access_token", "to", "subject", "body"]
                },
            ),
        ]

    @server.call_tool()
    async def handle_call_tool(
        name: str, arguments: dict[str, Any] | None
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        """Handle tool execution requests"""
        try:
            if not arguments:
                raise ValueError(f"Missing arguments for {name}")
            
            if name == "gmail_refresh_token":
                # For refresh token, we need refresh token, client ID and secret
                refresh_token = arguments.get("google_refresh_token")
                client_id = arguments.get("google_client_id")
                client_secret = arguments.get("google_client_secret")
                access_token = arguments.get("google_access_token")  # Optional for refresh
                
                if not refresh_token:
                    raise ValueError("google_refresh_token is required for token refresh")
                
                if not client_id or not client_secret:
                    raise ValueError("Both google_client_id and google_client_secret are required for token refresh")
                
                # Initialize Gmail client for token refresh
                gmail = GmailClient(
                    access_token=access_token, 
                    refresh_token=refresh_token
                )
                
                # Call the refresh_token method
                results = gmail.refresh_token(client_id=client_id, client_secret=client_secret)
                return [types.TextContent(type="text", text=results)]
            
            else:
                # For all other tools, we only need access token
                access_token = arguments.get("google_access_token")
                
                if not access_token:
                    raise ValueError("google_access_token is required")
                
                if name == "gmail_get_recent_emails":
                    # Initialize Gmail client with just access token
                    logger.debug(f"Initializing Gmail client for get_recent_emails with access token: {access_token[:10]}...")
                    try:
                        gmail = GmailClient(
                            access_token=access_token
                        )
                        logger.debug("Gmail client initialized successfully")
                        
                        max_results = int(arguments.get("max_results", 10))
                        unread_only = bool(arguments.get("unread_only", False))
                        logger.debug(f"Calling get_recent_emails with max_results={max_results} and unread_only={unread_only}")
                        results = gmail.get_recent_emails(max_results=max_results, unread_only=unread_only)
                        logger.debug(f"get_recent_emails result (first 200 chars): {results[:200]}...")
                        return [types.TextContent(type="text", text=results)]
                    except Exception as e:
                        logger.error(f"Exception in gmail_get_recent_emails handler: {str(e)}", exc_info=True)
                        return [types.TextContent(type="text", text=f"Error: {str(e)}")]
                    
                elif name == "gmail_get_email_body_chunk":
                    # Initialize Gmail client with just access token
                    gmail = GmailClient(
                        access_token=access_token
                    )
                    
                    message_id = arguments.get("message_id")
                    thread_id = arguments.get("thread_id")
                    offset = int(arguments.get("offset", 0))
                    
                    if not message_id and not thread_id:
                        raise ValueError("Either message_id or thread_id must be provided")
                    
                    results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset)
                    return [types.TextContent(type="text", text=results)]
                    
                elif name == "gmail_send_email":
                    # Initialize Gmail client with just access token
                    gmail = GmailClient(
                        access_token=access_token
                    )
                    
                    to = arguments.get("to")
                    subject = arguments.get("subject")
                    body = arguments.get("body")
                    html_body = arguments.get("html_body")
                    
                    if not to or not subject or not body:
                        raise ValueError("Missing required parameters: to, subject, and body are required")
                    
                    results = gmail.send_email(to=to, subject=subject, body=body, html_body=html_body)
                    return [types.TextContent(type="text", text=results)]

                else:
                    raise ValueError(f"Unknown tool: {name}")

        except Exception as e:
            return [types.TextContent(type="text", text=f"Error: {str(e)}")]

    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        logger.info("Server running with stdio transport")
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="gmail",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    import asyncio
    
    # Simplified command-line with no OAuth parameters
    asyncio.run(main()) 
```