#
tokens: 11143/50000 13/13 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── Dockerfile
├── gmail_api.py
├── gmail_mcp.log
├── gmail_server.py
├── gmail_token_creator.py
├── google_apis.py
├── LICENSE
├── pyproject.toml
├── read_emails.py
├── README.md
├── search_emails.py
├── send_emails.py
├── smithery.yaml
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python virtual environments
 2 | venv/
 3 | .env
 4 | 
 5 | # Token files and secrets
 6 | client_secret.json
 7 | token_files/
 8 | *.log
 9 | __pycache__/
10 | *.pyc
11 | 
12 | # Mac/Windows files
13 | .DS_Store
14 | 
15 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Gmail MCP Server
  2 | [![smithery badge](https://smithery.ai/badge/@Quantum-369/Gmail-mcp-server)](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server)
  3 | 
  4 | A powerful and flexible Gmail integration server built using the MCP (Message Control Protocol) framework. This server provides a robust interface to interact with Gmail APIs, offering functionality for reading, sending, and managing emails programmatically.
  5 | 
  6 | ## Features
  7 | 
  8 | - Read emails from multiple Gmail accounts
  9 | - Send emails with attachments
 10 | - Search emails with advanced query options
 11 | - Download email attachments
 12 | - Handle email conversations and threads
 13 | - Real-time email monitoring
 14 | - Support for multiple Gmail accounts
 15 | 
 16 | ## Prerequisites
 17 | 
 18 | Before running the Gmail MCP server, ensure you have the following:
 19 | 
 20 | 1. Python 3.12 or higher
 21 | 2. Google Cloud Project with Gmail API enabled
 22 | 3. OAuth 2.0 Client ID credentials
 23 | 4. Required Python packages (specified in pyproject.toml)
 24 | 
 25 | ## Installation
 26 | 
 27 | ### Installing via Smithery
 28 | 
 29 | To install Gmail Integration Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@Quantum-369/Gmail-mcp-server):
 30 | 
 31 | ```bash
 32 | npx -y @smithery/cli install @Quantum-369/Gmail-mcp-server --client claude
 33 | ```
 34 | 
 35 | 1. Clone the repository:
 36 | ```bash
 37 | git clone <your-repository-url>
 38 | cd gmail-mcp-server
 39 | ```
 40 | 
 41 | 2. Create and activate a virtual environment:
 42 | ```bash
 43 | python -m venv venv
 44 | # On Windows
 45 | venv\Scripts\activate
 46 | # On Unix/MacOS
 47 | source venv/bin/activate
 48 | ```
 49 | 
 50 | 3. Install dependencies:
 51 | ```bash
 52 | pip install .
 53 | ```
 54 | 
 55 | ## Running on Ubuntu
 56 | 
 57 | 1. Install Python 3.12 and create a virtual environment if you haven't already:
 58 |    ```bash
 59 |    sudo apt install python3.12 python3.12-venv -y
 60 |    python3.12 -m venv venv
 61 |    source venv/bin/activate
 62 |    ```
 63 | 2. Place your downloaded `client_secret.json` in the project root.
 64 | 3. Generate an OAuth token for the Gmail account you want to use. Copy the URL
 65 |    printed by the script into a web browser and complete the sign-in process:
 66 |    ```bash
 67 |    python gmail_token_creator.py
 68 |    ```
 69 | 4. Start the server:
 70 |    ```bash
 71 |    python gmail_server.py
 72 |    ```
 73 | 
 74 | ## Setup Google Cloud Project
 75 | 
 76 | 1. Go to the [Google Cloud Console](https://console.cloud.google.com/)
 77 | 2. Create a new project or select an existing one
 78 | 3. Enable the Gmail API for your project
 79 | 4. Create OAuth 2.0 credentials:
 80 |    - Go to "APIs & Services" > "Credentials"
 81 |    - Click "Create Credentials" > "OAuth client ID"
 82 |    - Choose "Desktop app" as application type
 83 |    - Download the client configuration file
 84 | 5. Rename the downloaded file to `client_secret.json` and place it in the project root directory
 85 | 
 86 | ## Configuration
 87 | 
 88 | 1. Set up email identifiers in `gmail_token_creator.py`:
 89 | ```python
 90 | email_identifier = '[email protected]'  # Change this for each account
 91 | ```
 92 | 
 93 | 2. Run the token creator to authenticate your Gmail accounts:
 94 | ```bash
 95 | python gmail_token_creator.py
 96 | ```
 97 | The script prints an authorization URL. Copy this URL into your web browser,
 98 | complete the Google consent flow, and copy the verification code back into the
 99 | terminal if prompted. A token file will be created inside `token_files/` for
100 | future use.
101 | 
102 | 3. Repeat the process for each Gmail account you want to integrate
103 | 
104 | ## Server Structure
105 | 
106 | - `gmail_server.py`: Main MCP server implementation
107 | - `gmail_api.py`: Gmail API interaction functions
108 | - `google_apis.py`: Google API authentication utilities
109 | - Supporting files:
110 |   - `read_emails.py`: Email reading functionality
111 |   - `search_emails.py`: Email search functionality
112 |   - `send_emails.py`: Email sending functionality
113 | 
114 | ## Usage
115 | 
116 | ### Starting the Server
117 | 
118 | ```bash
119 | python gmail_server.py
120 | ```
121 | 
122 | ### Running with Docker
123 | 
124 | You can also build a container image using the provided `Dockerfile`:
125 | 
126 | ```bash
127 | docker build -t gmail-mcp-server .
128 | docker run -v $(pwd)/client_secret.json:/app/client_secret.json \
129 |            -v $(pwd)/token_files:/app/token_files gmail-mcp-server
130 | ```
131 | 
132 | The container runs the same server and stores authentication tokens in the
133 | `token_files` directory on the host so they persist between runs.
134 | 
135 | ### Available Tools
136 | 
137 | 1. Send Email:
138 | ```python
139 | await send_gmail(
140 |     email_identifier="[email protected]",
141 |     to="[email protected]",
142 |     subject="Test Subject",
143 |     body="Email body content",
144 |     attachment_paths=["path/to/attachment"]
145 | )
146 | ```
147 | 
148 | 2. Search Emails:
149 | ```python
150 | await search_email_tool(
151 |     email_identifier="[email protected]",
152 |     query="from:[email protected]",
153 |     max_results=30,
154 |     include_conversations=True
155 | )
156 | ```
157 | 
158 | 3. Read Latest Emails:
159 | ```python
160 | await read_latest_emails(
161 |     email_identifier="[email protected]",
162 |     max_results=5,
163 |     download_attachments=False
164 | )
165 | ```
166 | 
167 | 4. Download Attachments:
168 | ```python
169 | await download_email_attachments(
170 |     email_identifier="[email protected]",
171 |     msg_id="message_id",
172 |     download_all_in_thread=False
173 | )
174 | ```
175 | 
176 | ## Security Considerations
177 | 
178 | - Store `client_secret.json` securely and never commit it to version control
179 | - Keep token files secure and add them to `.gitignore`
180 | - Use environment variables for sensitive information
181 | - Regularly rotate OAuth credentials
182 | - Monitor API usage and set appropriate quotas
183 | 
184 | ## Error Handling
185 | 
186 | The server includes comprehensive error handling and logging:
187 | - Logs are written to `gmail_mcp.log`
188 | - Both file and console logging are enabled
189 | - Detailed error messages for debugging
190 | 
191 | ## Contributing
192 | 
193 | 1. Fork the repository
194 | 2. Create a feature branch
195 | 3. Commit your changes
196 | 4. Push to the branch
197 | 5. Create a Pull Request
198 | 
199 | ## License
200 | 
201 | Apachelicense2.0
202 | 
203 | ## Support
204 | 
205 | For issues and feature requests, please use the GitHub issue tracker.
206 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "gmail-server"
 3 | version = "0.1.0"
 4 | description = "Add your description here"
 5 | readme = "README.md"
 6 | requires-python = ">=3.12"
 7 | dependencies = [
 8 |     "google-api-python-client>=2.160.0",
 9 |     "google-auth>=2.38.0",
10 |     "google-auth-oauthlib>=1.2.1",
11 |     "mcp[cli]>=1.2.1",
12 | ]
13 | 
```

--------------------------------------------------------------------------------
/gmail_token_creator.py:
--------------------------------------------------------------------------------

```python
 1 | 
 2 | # gmail_api.py
 3 | from google_apis import create_service
 4 | 
 5 | client_secret_file = 'client_secret.json'
 6 | API_SERVICE_NAME = 'gmail'
 7 | API_VERSION = 'v1'
 8 | SCOPES = ['https://mail.google.com/']
 9 | 
10 | # Add an identifier for each account, like the email address
11 | email_identifier = '[email protected]'  # Change this for each account
12 | #[email protected]
13 | #[email protected]
14 | #[email protected]
15 | #[email protected]
16 | #[email protected]
17 | 
18 | # Pass the email identifier as a prefix to create unique token files
19 | service = create_service(client_secret_file, API_SERVICE_NAME, API_VERSION, SCOPES, prefix=f'_{email_identifier}')
20 | #print(dir(service))
21 | 
```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
 1 | # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml
 2 | 
 3 | startCommand:
 4 |   type: stdio
 5 |   configSchema:
 6 |     # JSON Schema defining the configuration options for the MCP.
 7 |     type: object
 8 |     required:
 9 |       - clientSecretPath
10 |       - emailIdentifier
11 |     properties:
12 |       clientSecretPath:
13 |         type: string
14 |         description: Path to the OAuth 2.0 client secret JSON file.
15 |       emailIdentifier:
16 |         type: string
17 |         description: Email identifier for the Gmail account.
18 |   commandFunction:
19 |     # A function that produces the CLI command to start the MCP on stdio.
20 |     |-
21 |     (config) => ({ command: 'python', args: ['gmail_server.py'], env: { CLIENT_SECRET_PATH: config.clientSecretPath, EMAIL_IDENTIFIER: config.emailIdentifier } })
22 | 
```

--------------------------------------------------------------------------------
/send_emails.py:
--------------------------------------------------------------------------------

```python
 1 | from pathlib import Path
 2 | from gmail_api import init_gmail_service, send_email
 3 | 
 4 | # Configuration
 5 | client_file = 'client_secret.json'
 6 | email_identifier = '[email protected]'  # Change this for each account
 7 | 
 8 | # Initialize Gmail API service for the specific email account
 9 | service = init_gmail_service(client_file, prefix=f'_{email_identifier}')
10 | 
11 | # Email details
12 | to_address = '[email protected]'
13 | email_subject = 'MCP servers document'
14 | email_body = 'This is a test email sent using the Gmail API.'
15 | 
16 | # Attachments
17 | attachment_dir = Path('C:\\Users\\harsh\\Downloads\\MS projects\\MCP\\mcpdocs')
18 | attachment_files = list(attachment_dir.glob('*'))  # Load all files from the attachments folder
19 | 
20 | # Send the email
21 | response_email_sent = send_email(
22 |     service=service,
23 |     to=to_address,
24 |     subject=email_subject,
25 |     body=email_body, 
26 |     body_type='plain',
27 |     attachment_paths=attachment_files
28 | )
29 | 
30 | # Output response
31 | print(response_email_sent)
32 | 
33 | 
```

--------------------------------------------------------------------------------
/search_emails.py:
--------------------------------------------------------------------------------

```python
 1 | from gmail_api import init_gmail_service, get_email_message_details, search_emails, search_email_conversations
 2 | 
 3 | client_file = 'client_secret.json'
 4 | email_identifier = '[email protected]'  # Specify the email identifier
 5 | 
 6 | # Initialize Gmail API service
 7 | service = init_gmail_service(client_file, prefix=f'_{email_identifier}')
 8 | 
 9 | query = 'from:me'
10 | email_messages = search_emails(service, query, max_results=30)
11 | email_messages += search_email_conversations(service, query, max_results=30)  # Combine both
12 | 
13 | for message in email_messages:
14 |     email_detail = get_email_message_details(service, message['id'])
15 |     
16 |     if email_detail:  # Check if email details were fetched successfully
17 |         print(message['id'])
18 |         print(f"Subject: {email_detail.get('subject', 'No Subject')}")
19 |         print(f"Date: {email_detail.get('date', 'No Date')}")
20 |         print(f"Label: {email_detail.get('label', 'No Label')}")
21 |         print('Snippet: ', email_detail.get('snippet', 'No Snippet'))
22 |         print(f"Body: {email_detail.get('body', 'No Body')}")
23 |         print('-' * 50)
24 |     else:
25 |         print(f"Skipping message ID {message['id']} as it couldn't be retrieved.")
26 | 
27 | 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
 1 | # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
 2 | # Use a Python image with uv pre-installed
 3 | FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv
 4 | 
 5 | # Install the project into /app
 6 | WORKDIR /app
 7 | 
 8 | # Enable bytecode compilation
 9 | ENV UV_COMPILE_BYTECODE=1
10 | 
11 | # Copy from the cache instead of linking since it's a mounted volume
12 | ENV UV_LINK_MODE=copy
13 | 
14 | # Install the project's dependencies using the lockfile and settings
15 | RUN --mount=type=cache,target=/root/.cache/uv     --mount=type=bind,source=uv.lock,target=uv.lock     --mount=type=bind,source=pyproject.toml,target=pyproject.toml     uv sync --frozen --no-install-project --no-dev --no-editable
16 | 
17 | # Then, add the rest of the project source code and install it
18 | # Installing separately from its dependencies allows optimal layer caching
19 | ADD . /app
20 | RUN --mount=type=cache,target=/root/.cache/uv     uv sync --frozen --no-dev --no-editable
21 | 
22 | FROM python:3.12-slim-bookworm
23 | 
24 | WORKDIR /app
25 | 
26 | COPY --from=uv /root/.local /root/.local
27 | COPY --from=uv --chown=app:app /app/.venv /app/.venv
28 | 
29 | # Place executables in the environment at the front of the path
30 | ENV PATH="/app/.venv/bin:$PATH"
31 | 
32 | # when running the container, run the main server script
33 | ENTRYPOINT ["python", "gmail_server.py"]
34 | 
```

--------------------------------------------------------------------------------
/read_emails.py:
--------------------------------------------------------------------------------

```python
 1 | from pathlib import Path
 2 | from gmail_api import init_gmail_service, get_email_messages, get_email_message_details, download_attachments_parent
 3 | 
 4 | client_file = 'client_secret.json'
 5 | email_identifier = '[email protected]'  # Specify the email identifier
 6 | 
 7 | # Initialize Gmail API service
 8 | service = init_gmail_service(client_file, prefix=f'_{email_identifier}')
 9 | 
10 | # Correctly unpack the messages and next_page_token
11 | messages, _ = get_email_messages(service, max_results=5)
12 | 
13 | # Target directory to save attachments
14 | attachment_dir = Path('./downloaded_attachments')  # Folder to save attachments
15 | attachment_dir.mkdir(exist_ok=True)  # Create the folder if it doesn't exist
16 | 
17 | # Process Emails
18 | for msg in messages:
19 |     details = get_email_message_details(service, msg['id'])
20 |     if details:
21 |         print(f"Subject: {details['subject']}")
22 |         print(f"From: {details['sender']}")
23 |         print(f"Recipients: {details['recipients']}")
24 |         print(f"Body: {details['body'][:100]}...")  # Print first 100 characters of the body
25 |         print(f"Snippet: {details['snippet']}")
26 |         print(f"Has Attachments: {details['has_attachments']}")
27 |         print(f"Date: {details['date']}")
28 |         print(f"Star: {details['star']}")
29 |         print(f"Label: {details['label']}")
30 |         print("-" * 50)
31 | 
32 |         # Download Attachments if present
33 |         if details['has_attachments']:
34 |             download_attachments_parent(service, user_id='me', msg_id=msg['id'], target_dir=str(attachment_dir))
35 | 
```

--------------------------------------------------------------------------------
/google_apis.py:
--------------------------------------------------------------------------------

```python
 1 | # google_apis.py
 2 | import os
 3 | from google_auth_oauthlib.flow import InstalledAppFlow
 4 | from googleapiclient.discovery import build
 5 | from google.oauth2.credentials import Credentials
 6 | from google.auth.transport.requests import Request
 7 | 
 8 | def create_service(client_secret_file, api_name, api_version, *scopes, prefix=''):
 9 |     CLIENT_SECRET_FILE = client_secret_file
10 |     API_SERVICE_NAME = api_name
11 |     API_VERSION = api_version
12 | 
13 |     SCOPES = [scope for scope in scopes[0]]
14 | 
15 |     creds = None
16 |     working_dir = os.getcwd()
17 |     token_dir = 'token_files'
18 | 
19 |     # Include the prefix (email identifier) in the token file name
20 |     token_file = f'token_{API_SERVICE_NAME}_{API_VERSION}{prefix}.json'
21 | 
22 |     # Check if the token directory exists, create if not
23 |     if not os.path.exists(os.path.join(working_dir, token_dir)):
24 |         os.mkdir(os.path.join(working_dir, token_dir))
25 | 
26 |     # Load existing credentials if available
27 |     if os.path.exists(os.path.join(working_dir, token_dir, token_file)):
28 |         creds = Credentials.from_authorized_user_file(
29 |             os.path.join(working_dir, token_dir, token_file), SCOPES
30 |         )
31 | 
32 |     # If no valid credentials, initiate the authentication flow
33 |     if not creds or not creds.valid:
34 |         if creds and creds.expired and creds.refresh_token:
35 |             creds.refresh(Request())
36 |         else:
37 |             flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES)
38 |             creds = flow.run_local_server(port=0)
39 | 
40 |         # Save the credentials for future use
41 |         with open(os.path.join(working_dir, token_dir, token_file), 'w') as token:
42 |             token.write(creds.to_json())
43 | 
44 |     try:
45 |         service = build(API_SERVICE_NAME, API_VERSION, credentials=creds, static_discovery=False)
46 |         print(f'{API_SERVICE_NAME} {API_VERSION} service created successfully for {prefix}')
47 |         return service
48 |     except Exception as e:
49 |         print(e)
50 |         print(f'Failed to create service instance for {API_SERVICE_NAME}')
51 |         # Remove corrupted token file if exists
52 |         if os.path.exists(os.path.join(working_dir, token_dir, token_file)):
53 |             os.remove(os.path.join(working_dir, token_dir, token_file))
54 |         return None
55 | 
```

--------------------------------------------------------------------------------
/gmail_api.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | import base64
  3 | from email.mime.text import MIMEText
  4 | from email.mime.multipart import MIMEMultipart
  5 | from email.mime.base import MIMEBase
  6 | from email import encoders
  7 | from pathlib import Path
  8 | from google_apis import create_service
  9 | 
 10 | def init_gmail_service(client_file, api_name='gmail', api_version='v1', scopes=['https://mail.google.com/'], prefix=''):
 11 |     return create_service(client_file, api_name, api_version, scopes, prefix=prefix)
 12 | 
 13 | 
 14 | def _extract_body(payload):
 15 |     body = '<text body not available>'
 16 |     if 'parts' in payload:
 17 |         for part in payload['parts']:
 18 |             if part['mimeType'] == 'multipart/alternative':
 19 |                 for subpart in part['parts']:
 20 |                     if subpart['mimeType'] == 'text/plain' and 'data' in subpart['body']:
 21 |                         body = base64.urlsafe_b64decode(subpart['body']['data']).decode('utf-8')
 22 |                         break
 23 |             elif part['body']['data']:
 24 |                 body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
 25 |                 break
 26 |     return body
 27 | 
 28 | def get_email_messages(service, user_id='me', label_ids=None, folder_name='INBOX', max_results=5):
 29 |     messages = []
 30 |     next_page_token = None
 31 | 
 32 |     if folder_name:
 33 |         label_results = service.users().labels().list(userId=user_id).execute()
 34 |         labels = label_results.get('labels', [])
 35 |         folder_label_id = next((label['id'] for label in labels if label['name'].lower() == folder_name.lower()), None)
 36 | 
 37 |         if folder_label_id:
 38 |             message_response = service.users().messages().list(userId=user_id, labelIds=[folder_label_id], maxResults=max_results).execute()
 39 |             messages.extend(message_response.get('messages', []))
 40 |             next_page_token = message_response.get('nextPageToken', None)
 41 | 
 42 |     return messages, next_page_token
 43 | 
 44 | def get_email_message_details(service, msg_id):
 45 |     try:
 46 |         message = service.users().messages().get(userId='me', id=msg_id).execute()
 47 |         payload = message['payload']
 48 |         headers = payload.get('headers', [])
 49 |         
 50 |         subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No subject')
 51 |         
 52 |         sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'No sender')
 53 |         
 54 |         recipients = next((header['value'] for header in headers if header['name'].lower() == 'to'), 'No recipients')
 55 |         
 56 |         snippet = message.get('snippet', 'No snippet')
 57 |         
 58 |         has_attachments = any(part.get('filename') for part in payload.get('parts', []) if part.get('filename'))
 59 |         
 60 |         date = next((header['value'] for header in headers if header['name'].lower() == 'date'), 'No date')
 61 |         
 62 |         star = message.get('labelIds', []).count('STARRED') > 0
 63 |         
 64 |         label = ', '.join(message.get('labelIds', []))
 65 |         
 66 |         body = _extract_body(payload)
 67 |         
 68 |         return {
 69 |             'subject': subject,
 70 |             'sender': sender,
 71 |             'recipients': recipients,
 72 |             'body': body,
 73 |             'snippet': snippet,
 74 |             'has_attachments': has_attachments,
 75 |             'date': date,
 76 |             'star': star,
 77 |             'label': label
 78 |         }
 79 |     except Exception as e:
 80 |         print(f'Error getting email message details: {e}')
 81 |         return None
 82 | def send_email(service, to, subject, body, body_type='plain', attachment_paths=None):
 83 |     from email.mime.text import MIMEText
 84 |     from email.mime.multipart import MIMEMultipart
 85 |     from email.mime.base import MIMEBase
 86 |     from email import encoders
 87 |     import base64
 88 |     import os
 89 |     from pathlib import Path
 90 |     import mimetypes
 91 | 
 92 |     message = MIMEMultipart()
 93 |     message['to'] = to
 94 |     message['subject'] = subject
 95 | 
 96 |     message.attach(MIMEText(body, body_type))
 97 | 
 98 |     # Handle attachments
 99 |     if attachment_paths:
100 |         for attachment_path in attachment_paths:
101 |             attachment_path = Path(attachment_path)  # Ensure it's a Path object
102 |             if not attachment_path.exists():
103 |                 raise FileNotFoundError(f"File not found - {attachment_path}")
104 | 
105 |             # Guess the MIME type based on file extension
106 |             content_type, encoding = mimetypes.guess_type(attachment_path)
107 |             if content_type is None or encoding is not None:
108 |                 content_type = 'application/octet-stream'
109 | 
110 |             main_type, sub_type = content_type.split('/', 1)
111 | 
112 |             with open(attachment_path, 'rb') as attachment_file:
113 |                 part = MIMEBase(main_type, sub_type)
114 |                 part.set_payload(attachment_file.read())
115 |                 encoders.encode_base64(part)
116 |                 part.add_header(
117 |                     'Content-Disposition',
118 |                     f'attachment; filename="{attachment_path.name}"'
119 |                 )
120 |                 message.attach(part)
121 | 
122 |     # Encode message and send
123 |     raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
124 |     send_message = {'raw': raw_message}
125 | 
126 |     try:
127 |         sent_message = service.users().messages().send(userId='me', body=send_message).execute()
128 |         print(f"Email sent successfully to {to} with attachments.")
129 |         return sent_message
130 |     except Exception as e:
131 |         print(f"An error occurred: {e}")
132 |         return None
133 | def download_attachments_parent(service, user_id, msg_id, target_dir):
134 |     message = service.users().messages().get(userId=user_id, id=msg_id).execute()
135 |     for part in message['payload']['parts']:
136 |         if part['filename']:
137 |             att_id = part['body']['attachmentId']
138 |             att = service.users().messages().attachments().get(userId=user_id, messageId=msg_id, id=att_id).execute()
139 |             data = att['data']
140 |             file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))
141 |             file_path = os.path.join(target_dir, part['filename'])
142 |             print('Saving attachment to:', file_path)
143 |             with open(file_path, 'wb') as f:
144 |                 f.write(file_data)
145 | 
146 | def download_attachments_all(service, user_id, msg_id, target_dir):
147 |     thread = service.users().threads().get(userId=user_id, id=msg_id).execute()
148 |     for message in thread['messages']:
149 |         for part in message['payload']['parts']:
150 |             if part['filename']:
151 |                 att_id = part['body']['attachmentId']
152 |                 att = service.users().messages().attachments().get(userId=user_id, messageId=message['id'], id=att_id).execute()
153 |                 data = att['data']
154 |                 file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))
155 |                 file_path = os.path.join(target_dir, part['filename'])
156 |                 print('Saving attachment to:', file_path)
157 |                 with open(file_path, 'wb') as f:
158 |                     f.write(file_data)
159 | def search_emails(service, query, user_id='me', max_results=5):
160 |     messages = []
161 |     next_page_token = None
162 | 
163 |     while True:
164 |         result = service.users().messages().list(
165 |             userId=user_id,
166 |             q=query,
167 |             maxResults=min(500, max_results - len(messages)) if max_results else 500,
168 |             pageToken=next_page_token
169 |         ).execute()
170 | 
171 |         messages.extend(result.get('messages', []))
172 | 
173 |         next_page_token = result.get('nextPageToken')
174 | 
175 |         if not next_page_token or (max_results and len(messages) >= max_results):
176 |             break
177 | 
178 |     return messages[:max_results] if max_results else messages
179 | 
180 | def search_email_conversations(service, query, user_id='me', max_results=5):
181 |     conversations = []
182 |     next_page_token = None
183 | 
184 |     while True:
185 |         result = service.users().threads().list(
186 |             userId=user_id,
187 |             q=query,
188 |             maxResults=min(500, max_results - len(conversations)) if max_results else 500,
189 |             pageToken=next_page_token
190 |         ).execute()
191 | 
192 |         conversations.extend(result.get('threads', []))
193 | 
194 |         next_page_token = result.get('nextPageToken')
195 | 
196 |         if not next_page_token or (max_results and len(conversations) >= max_results):
197 |             break
198 | 
199 |     return conversations[:max_results] if max_results else conversations
200 | 
```

--------------------------------------------------------------------------------
/gmail_server.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | import logging
  3 | from pathlib import Path
  4 | from typing import List, Optional, Dict, Any
  5 | from mcp.server.fastmcp import FastMCP, Context
  6 | 
  7 | from gmail_api import (
  8 |     init_gmail_service,
  9 |     get_email_message_details,
 10 |     search_emails,
 11 |     search_email_conversations,
 12 |     send_email,
 13 |     get_email_messages,
 14 |     download_attachments_parent,
 15 |     download_attachments_all
 16 | )
 17 | 
 18 | # Configure logging
 19 | logging.basicConfig(
 20 |     level=logging.INFO,
 21 |     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
 22 |     handlers=[
 23 |         logging.FileHandler('gmail_mcp.log'),
 24 |         logging.StreamHandler()
 25 |     ]
 26 | )
 27 | logger = logging.getLogger('gmail_mcp')
 28 | 
 29 | # Initialize MCP Server
 30 | mcp = FastMCP(
 31 |     "Gmail MCP Server",
 32 |     dependencies=["google-api-python-client", "google-auth-oauthlib"]
 33 | )
 34 | 
 35 | # Gmail Service Initialization
 36 | CLIENT_FILE = 'client_secret.json'
 37 | 
 38 | def get_gmail_service(email_identifier: str):
 39 |     try:
 40 |         service = init_gmail_service(CLIENT_FILE, prefix=f'_{email_identifier}')
 41 |         if not service:
 42 |             raise ValueError(f"Failed to initialize Gmail service for {email_identifier}")
 43 |         return service
 44 |     except Exception as e:
 45 |         logger.error(f"Error initializing Gmail service: {str(e)}")
 46 |         raise
 47 | 
 48 | # Resources
 49 | @mcp.resource("gmail://inbox/{email_identifier}")
 50 | async def get_inbox(email_identifier: str) -> Dict[str, Any]:
 51 |     """Get latest emails from inbox"""
 52 |     try:
 53 |         logger.info(f"Fetching inbox for {email_identifier}")
 54 |         service = get_gmail_service(email_identifier)
 55 |         messages, next_page = get_email_messages(service, max_results=10)
 56 |         emails = []
 57 |         for msg in messages:
 58 |             details = get_email_message_details(service, msg['id'])
 59 |             if details:
 60 |                 emails.append(details)
 61 |         return {
 62 |             "success": True,
 63 |             "emails": emails,
 64 |             "has_more": bool(next_page)
 65 |         }
 66 |     except Exception as e:
 67 |         logger.error(f"Error fetching inbox: {str(e)}")
 68 |         return {"success": False, "message": str(e)}
 69 | 
 70 | @mcp.resource("gmail://email/{email_identifier}/{msg_id}")
 71 | async def get_email_details(email_identifier: str, msg_id: str) -> Dict[str, Any]:
 72 |     """Get detailed information about a specific email"""
 73 |     try:
 74 |         logger.info(f"Fetching email details for ID {msg_id}")
 75 |         service = get_gmail_service(email_identifier)
 76 |         details = get_email_message_details(service, msg_id)
 77 |         if details:
 78 |             return {"success": True, "email": details}
 79 |         return {"success": False, "message": "Email not found"}
 80 |     except Exception as e:
 81 |         logger.error(f"Error fetching email details: {str(e)}")
 82 |         return {"success": False, "message": str(e)}
 83 | 
 84 | @mcp.resource("gmail://attachments/{email_identifier}/{msg_id}")
 85 | async def list_attachments(email_identifier: str, msg_id: str) -> Dict[str, Any]:
 86 |     """List attachments for a specific email"""
 87 |     try:
 88 |         logger.info(f"Listing attachments for email {msg_id}")
 89 |         service = get_gmail_service(email_identifier)
 90 |         details = get_email_message_details(service, msg_id)
 91 |         if details and details.get('has_attachments'):
 92 |             return {
 93 |                 "success": True,
 94 |                 "has_attachments": True,
 95 |                 "message_id": msg_id
 96 |             }
 97 |         return {
 98 |             "success": True,
 99 |             "has_attachments": False
100 |         }
101 |     except Exception as e:
102 |         logger.error(f"Error listing attachments: {str(e)}")
103 |         return {"success": False, "message": str(e)}
104 | 
105 | # Tools
106 | @mcp.tool()
107 | async def send_gmail(
108 |     email_identifier: str, 
109 |     to: str, 
110 |     subject: str, 
111 |     body: str, 
112 |     attachment_paths: Optional[List[str]] = None
113 | ) -> Dict[str, Any]:
114 |     """Send an email with optional attachments"""
115 |     try:
116 |         logger.info(f"Sending email to {to} from {email_identifier}")
117 |         service = get_gmail_service(email_identifier)
118 |         
119 |         # Validate attachment paths
120 |         if attachment_paths:
121 |             for path in attachment_paths:
122 |                 if not os.path.exists(path):
123 |                     return {
124 |                         "success": False,
125 |                         "message": f"Attachment not found: {path}"
126 |                     }
127 |         
128 |         response = send_email(
129 |             service=service,
130 |             to=to,
131 |             subject=subject,
132 |             body=body,
133 |             body_type='plain',
134 |             attachment_paths=attachment_paths
135 |         )
136 |         
137 |         if response:
138 |             return {
139 |                 "success": True,
140 |                 "message": f"Email sent successfully to {to}",
141 |                 "message_id": response.get('id', 'unknown')
142 |             }
143 |         return {
144 |             "success": False,
145 |             "message": "Failed to send email"
146 |         }
147 |     except Exception as e:
148 |         logger.error(f"Error sending email: {str(e)}")
149 |         return {"success": False, "message": str(e)}
150 | 
151 | @mcp.tool()
152 | async def search_email_tool(
153 |     email_identifier: str,
154 |     query: str = '',
155 |     max_results: int = 30,
156 |     include_conversations: bool = True
157 | ) -> Dict[str, Any]:
158 |     """Search emails with optional conversation inclusion"""
159 |     try:
160 |         logger.info(f"Searching emails for {email_identifier} with query: {query}")
161 |         service = get_gmail_service(email_identifier)
162 |         
163 |         emails = []
164 |         # Search regular emails
165 |         messages = search_emails(service, query, max_results=max_results)
166 |         for msg in messages:
167 |             details = get_email_message_details(service, msg['id'])
168 |             if details:
169 |                 emails.append(details)
170 |                 
171 |         # Search conversations if requested
172 |         if include_conversations:
173 |             conversations = search_email_conversations(service, query, max_results=max_results)
174 |             for conv in conversations:
175 |                 details = get_email_message_details(service, conv['id'])
176 |                 if details:
177 |                     emails.append(details)
178 |                     
179 |         return {
180 |             "success": True,
181 |             "message": f"Found {len(emails)} emails",
182 |             "emails": emails
183 |         }
184 |     except Exception as e:
185 |         logger.error(f"Error searching emails: {str(e)}")
186 |         return {"success": False, "message": str(e), "emails": []}
187 | 
188 | @mcp.tool()
189 | async def read_latest_emails(
190 |     email_identifier: str,
191 |     max_results: int = 5,
192 |     download_attachments: bool = False
193 | ) -> Dict[str, Any]:
194 |     """Read latest emails with optional attachment download"""
195 |     try:
196 |         logger.info(f"Reading latest {max_results} emails for {email_identifier}")
197 |         service = get_gmail_service(email_identifier)
198 |         
199 |         messages, _ = get_email_messages(service, max_results=max_results)
200 |         emails = []
201 |         
202 |         attachment_dir = Path('./downloaded_attachments')
203 |         if download_attachments:
204 |             attachment_dir.mkdir(exist_ok=True)
205 |             
206 |         for msg in messages:
207 |             details = get_email_message_details(service, msg['id'])
208 |             if details:
209 |                 if download_attachments and details.get('has_attachments'):
210 |                     download_attachments_parent(
211 |                         service, 
212 |                         user_id='me',
213 |                         msg_id=msg['id'],
214 |                         target_dir=str(attachment_dir)
215 |                     )
216 |                     details['attachments_downloaded'] = True
217 |                     details['attachment_dir'] = str(attachment_dir)
218 |                 emails.append(details)
219 |         
220 |         return {
221 |             "success": True,
222 |             "message": f"Retrieved {len(emails)} latest emails",
223 |             "emails": emails,
224 |             "attachment_downloads": download_attachments
225 |         }
226 |     except Exception as e:
227 |         logger.error(f"Error reading latest emails: {str(e)}")
228 |         return {"success": False, "message": str(e), "emails": []}
229 | 
230 | @mcp.tool()
231 | async def download_email_attachments(
232 |     email_identifier: str,
233 |     msg_id: str,
234 |     download_all_in_thread: bool = False
235 | ) -> Dict[str, Any]:
236 |     """Download attachments for a specific email or its entire thread"""
237 |     try:
238 |         logger.info(f"Downloading attachments for email {msg_id}")
239 |         service = get_gmail_service(email_identifier)
240 |         
241 |         attachment_dir = Path('./downloaded_attachments')
242 |         attachment_dir.mkdir(exist_ok=True)
243 |         
244 |         if download_all_in_thread:
245 |             download_attachments_all(
246 |                 service,
247 |                 user_id='me',
248 |                 msg_id=msg_id,
249 |                 target_dir=str(attachment_dir)
250 |             )
251 |         else:
252 |             download_attachments_parent(
253 |                 service,
254 |                 user_id='me',
255 |                 msg_id=msg_id,
256 |                 target_dir=str(attachment_dir)
257 |             )
258 |             
259 |         return {
260 |             "success": True,
261 |             "message": "Attachments downloaded successfully",
262 |             "directory": str(attachment_dir),
263 |             "thread_downloaded": download_all_in_thread
264 |         }
265 |     except Exception as e:
266 |         logger.error(f"Error downloading attachments: {str(e)}")
267 |         return {"success": False, "message": str(e)}
268 | 
269 | # Prompts
270 | @mcp.prompt()
271 | def compose_email_prompt() -> Dict[str, Any]:
272 |     """Guide for composing and sending an email"""
273 |     return {
274 |         "description": "Guide for composing and sending an email",
275 |         "messages": [
276 |             {
277 |                 "role": "system",
278 |                 "content": """You're helping the user compose and send an email. Make sure to collect:
279 | 1. Email identifier (the account sending the email)
280 | 2. Recipient's email address
281 | 3. Subject line
282 | 4. Email body content
283 | 5. Any attachments (optional) - provide full file paths"""
284 |             },
285 |             {
286 |                 "role": "user",
287 |                 "content": "I need to send an email."
288 |             },
289 |             {
290 |                 "role": "assistant",
291 |                 "content": """I'll help you compose and send an email. Please provide:
292 | 
293 | 1. Which email account should send this? (email identifier)
294 | 2. Who are you sending it to? (recipient's email)
295 | 3. What's the subject of your email?
296 | 4. What would you like to say in the email?
297 | 5. Do you need to attach any files? If yes, please provide the file paths."""
298 |             }
299 |         ]
300 |     }
301 | 
302 | @mcp.prompt()
303 | def search_email_prompt() -> Dict[str, Any]:
304 |     """Guide for searching emails with various criteria"""
305 |     return {
306 |         "description": "Guide for searching emails",
307 |         "messages": [
308 |             {
309 |                 "role": "system",
310 |                 "content": """You're helping the user search through their emails. Collect:
311 | 1. Email identifier (which account to search)
312 | 2. Search criteria (from, to, subject, date range, etc.)
313 | 3. Maximum number of results needed
314 | 4. Whether to include conversation threads"""
315 |             },
316 |             {
317 |                 "role": "user",
318 |                 "content": "I want to search my emails."
319 |             },
320 |             {
321 |                 "role": "assistant",
322 |                 "content": """I'll help you search your emails. Please specify:
323 | 
324 | 1. Which email account do you want to search? (email identifier)
325 | 2. What are you looking for? You can search by:
326 |    - Sender (from:[email protected])
327 |    - Subject (subject:meeting)
328 |    - Date range (after:2024/01/01 before:2024/02/01)
329 |    - Has attachment (has:attachment)
330 |    Or combine these criteria.
331 | 3. How many results would you like to see? (default is 30)
332 | 4. Should I include conversation threads in the search? (yes/no)"""
333 |             }
334 |         ]
335 |     }
336 | 
337 | @mcp.prompt()
338 | def read_latest_emails_prompt() -> Dict[str, Any]:
339 |     """Guide for reading recent emails with optional attachment handling"""
340 |     return {
341 |         "description": "Guide for reading latest emails",
342 |         "messages": [
343 |             {
344 |                 "role": "system",
345 |                 "content": """You're helping the user read their recent emails. Collect:
346 | 1. Email identifier (which account to read)
347 | 2. Number of emails to retrieve
348 | 3. Whether to automatically download attachments"""
349 |             },
350 |             {
351 |                 "role": "user",
352 |                 "content": "I want to check my recent emails."
353 |             },
354 |             {
355 |                 "role": "assistant",
356 |                 "content": """I'll help you check your recent emails. Please specify:
357 | 
358 | 1. Which email account do you want to check? (email identifier)
359 | 2. How many recent emails would you like to see? (default is 5)
360 | 3. Should I automatically download any attachments found? (yes/no)
361 |    Note: Attachments will be saved to a 'downloaded_attachments' folder."""
362 |             }
363 |         ]
364 |     }
365 | 
366 | @mcp.prompt()
367 | def download_attachments_prompt() -> Dict[str, Any]:
368 |     """Guide for downloading email attachments"""
369 |     return {
370 |         "description": "Guide for downloading email attachments",
371 |         "messages": [
372 |             {
373 |                 "role": "system",
374 |                 "content": """You're helping the user download email attachments. Collect:
375 | 1. Email identifier (which account to use)
376 | 2. Message ID of the email
377 | 3. Whether to download attachments from the entire conversation thread"""
378 |             },
379 |             {
380 |                 "role": "user",
381 |                 "content": "I want to download attachments from an email."
382 |             },
383 |             {
384 |                 "role": "assistant",
385 |                 "content": """I'll help you download email attachments. Please provide:
386 | 
387 | 1. Which email account has the attachments? (email identifier)
388 | 2. What's the Message ID of the email? (You can get this from search results)
389 | 3. Do you want to download attachments from the entire conversation thread? (yes/no)
390 |    Note: Files will be saved to a 'downloaded_attachments' folder."""
391 |             }
392 |         ]
393 |     }
394 | 
395 | if __name__ == "__main__":
396 |     try:
397 |         logger.info("Starting Gmail MCP server...")
398 |         mcp.run()
399 |     except KeyboardInterrupt:
400 |         logger.info("Server shutting down gracefully...")
401 |     except Exception as e:
402 |         logger.error(f"Fatal server error: {str(e)}")
403 | 
```