# Directory Structure ``` ├── .replit ├── api │ └── index.js ├── build.sh ├── Dockerfile ├── LICENSE ├── package.json ├── public │ └── index.html ├── pyproject.toml ├── README.md ├── replit.nix ├── requirements.txt ├── runtime.txt ├── server.js ├── smithery.yaml ├── src │ └── mcp_server_hubspot │ ├── __init__.py │ ├── debug.py │ └── server.py └── vercel.json ``` # Files -------------------------------------------------------------------------------- /.replit: -------------------------------------------------------------------------------- ``` run = "npm start" entrypoint = "server.js" language = "nodejs" [nix] channel = "stable-22_11" [env] PORT = "3000" [packager] language = "nodejs" [packager.features] packageSearch = true guessImports = true [languages.js] pattern = "**/*.js" syntax = "javascript" [languages.js.languageServer] start = [ "typescript-language-server", "--stdio" ] [deployment] run = ["sh", "-c", "npm start"] deploymentTarget = "cloudrun" ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # HubSpot MCP Server [](https://hub.docker.com/r/buryhuang/mcp-hubspot) [](https://smithery.ai/server/mcp-hubspot/prod) ## Overview A Model Context Protocol (MCP) server implementation that provides integration with HubSpot CRM. This server enables AI models to interact with HubSpot data and operations through a standardized interface. For more information about the Model Context Protocol and how it works, see [Anthropic's MCP documentation](https://www.anthropic.com/news/model-context-protocol). <a href="https://glama.ai/mcp/servers/vpoifk4jai"><img width="380" height="200" src="https://glama.ai/mcp/servers/vpoifk4jai/badge" alt="HubSpot Server MCP server" /></a> ## Components ### Resources The server exposes the following resources: * `hubspot://hubspot_contacts`: A dynamic resource that provides access to HubSpot contacts * `hubspot://hubspot_companies`: A dynamic resource that provides access to HubSpot companies * `hubspot://hubspot_recent_engagements`: A dynamic resource that provides access to HubSpot engagements from the last 3 days All resources auto-update as their respective objects are modified in HubSpot. ### Example Prompts - Create Hubspot contacts by copying from LinkedIn profile webpage: ``` Create HubSpot contacts and companies from following: John Doe Software Engineer at Tech Corp San Francisco Bay Area • 500+ connections Experience Tech Corp Software Engineer Jan 2020 - Present · 4 yrs San Francisco, California Previous Company Inc. Senior Developer 2018 - 2020 · 2 yrs Education University of California, Berkeley Computer Science, BS 2014 - 2018 ``` - Get latest activities for your company: ``` What's happening latestly with my pipeline? ``` ### Tools The server offers several tools for managing HubSpot objects: #### Contact Management Tools * `hubspot_get_contacts` * Retrieve contacts from HubSpot * No input required * Returns: Array of contact objects * `hubspot_create_contact` * Create a new contact in HubSpot (checks for duplicates before creation) * Input: * `firstname` (string): Contact's first name * `lastname` (string): Contact's last name * `email` (string, optional): Contact's email address * `properties` (dict, optional): Additional contact properties * Example: `{"phone": "123456789", "company": "HubSpot"}` * Behavior: * Checks for existing contacts with the same first name and last name * If `company` is provided in properties, also checks for matches with the same company * Returns existing contact details if a match is found * Creates new contact only if no match is found #### Company Management Tools * `hubspot_get_companies` * Retrieve companies from HubSpot * No input required * Returns: Array of company objects * `hubspot_create_company` * Create a new company in HubSpot (checks for duplicates before creation) * Input: * `name` (string): Company name * `properties` (dict, optional): Additional company properties * Example: `{"domain": "example.com", "industry": "Technology"}` * Behavior: * Checks for existing companies with the same name * Returns existing company details if a match is found * Creates new company only if no match is found * `hubspot_get_company_activity` * Get activity history for a specific company * Input: * `company_id` (string): HubSpot company ID * Returns: Array of activity objects #### Engagement Tools * `hubspot_get_recent_engagements` * Get HubSpot engagements from all companies and contacts from the last 3 days * No input required * Returns: Array of engagement objects with full metadata ## Multi-User Support This MCP server is designed to work with multiple HubSpot users, each with their own access token. The server does not use a global environment variable for the access token. Instead, each request to the MCP server should include the user's specific access token in one of the following ways: 1. In the request header: `X-HubSpot-Access-Token: your-token-here` 2. In the request body as `accessToken`: `{"accessToken": "your-token-here"}` 3. In the request body as `hubspotAccessToken`: `{"hubspotAccessToken": "your-token-here"}` This design allows you to store user tokens in your own backend (e.g., Supabase) and pass them along with each request. ### Example Multi-User Integration ```javascript // Example of how to use this MCP server in a multi-user setup async function makeHubSpotRequest(userId, action, params) { // Retrieve the user's HubSpot token from your database const userToken = await getUserHubSpotToken(userId); // Make request to MCP server with the user's token const response = await fetch('https://your-mcp-server.vercel.app/', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-HubSpot-Access-Token': userToken }, body: JSON.stringify({ action, ...params }) }); return await response.json(); } ``` ## Setup ### Prerequisites You'll need a HubSpot access token for each user. You can obtain this by: 1. Creating a private app in your HubSpot account: Follow the [HubSpot Private Apps Guide](https://developers.hubspot.com/docs/guides/apps/private-apps/overview) - Go to your HubSpot account settings - Navigate to Integrations > Private Apps - Click "Create private app" - Fill in the basic information: - Name your app - Add description - Upload logo (optional) - Define required scopes: - oauth (required) - Optional scopes: - crm.dealsplits.read_write - crm.objects.companies.read - crm.objects.companies.write - crm.objects.contacts.read - crm.objects.contacts.write - crm.objects.deals.read - Review and create the app - Copy the generated access token Note: Keep your access token secure and never commit it to version control. ### Docker Installation You can either build the image locally or pull it from Docker Hub. The image is built for the Linux platform. #### Supported Platforms - Linux/amd64 - Linux/arm64 - Linux/arm/v7 #### Option 1: Pull from Docker Hub ```bash docker pull buryhuang/mcp-hubspot:latest ``` #### Option 2: Build Locally ```bash docker build -t mcp-hubspot . ``` Run the container: ```bash docker run \ buryhuang/mcp-hubspot:latest ``` ## Cross-Platform Publishing To publish the Docker image for multiple platforms, you can use the `docker buildx` command. Follow these steps: 1. **Create a new builder instance** (if you haven't already): ```bash docker buildx create --use ``` 2. **Build and push the image for multiple platforms**: ```bash docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t buryhuang/mcp-hubspot:latest --push . ``` 3. **Verify the image is available for the specified platforms**: ```bash docker buildx imagetools inspect buryhuang/mcp-hubspot:latest ``` ## Usage with Claude Desktop ### Installing via Smithery To install mcp-hubspot for Claude Desktop automatically via [Smithery](https://smithery.ai/server/mcp-hubspot/prod): ```bash npx -y @smithery/cli@latest install mcp-hubspot --client claude ``` ### Docker Usage ```json { "mcpServers": { "hubspot": { "command": "docker", "args": [ "run", "-i", "--rm", "buryhuang/mcp-hubspot:latest" ] } } } ``` ## Development To set up the development environment: ```bash pip install -e . ``` ## License This project is licensed under the MIT License. ``` -------------------------------------------------------------------------------- /runtime.txt: -------------------------------------------------------------------------------- ``` python-3.9 ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` hubspot-api-client>=8.0.0 python-dotenv>=1.0.0 mcp>=0.0.1 pydantic>=2.0.0 python-dateutil>=2.8.2 ``` -------------------------------------------------------------------------------- /vercel.json: -------------------------------------------------------------------------------- ```json { "version": 2, "functions": { "api/index.js": { "memory": 1024, "maxDuration": 10 } }, "routes": [ { "src": "/(.*)", "dest": "/api/index.js" } ] } ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use Python base image FROM python:3.10-slim-bookworm # Install the project into `/app` WORKDIR /app # Copy the entire project COPY . /app # Install the package RUN pip install --no-cache-dir . # Run the server ENTRYPOINT ["mcp-server-hubspot"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-server-hubspot" version = "0.1.0" description = "A simple Hubspot MCP server" readme = "README.md" requires-python = ">=3.10" dependencies = ["mcp>=1.0.0", "hubspot-api-client>=8.1.0", "python-dotenv>=1.0.0"] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.uv] dev-dependencies = ["pyright>=1.1.389"] [project.scripts] mcp-server-hubspot = "mcp_server_hubspot:main" ``` -------------------------------------------------------------------------------- /package.json: -------------------------------------------------------------------------------- ```json { "name": "hubspot-mcp", "version": "1.0.0", "description": "HubSpot MCP Server for Windsurf", "main": "server.js", "scripts": { "start": "node server.js", "install-python-deps": "pip install -r requirements.txt", "build": "chmod +x ./build.sh && ./build.sh", "postinstall": "npm run install-python-deps" }, "engines": { "node": ">=14" }, "dependencies": { "express": "^4.18.2" } } ``` -------------------------------------------------------------------------------- /build.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Install Python dependencies python -m pip install -r requirements.txt 2>/dev/null || pip3 install -r requirements.txt 2>/dev/null || echo "Warning: Failed to install Python dependencies" # Create necessary directories mkdir -p api # Create the public directory that Vercel requires mkdir -p public touch public/.gitkeep # Print debug info echo "Build script executed successfully" echo "Current directory: $(pwd)" echo "Files in current directory:" ls -la echo "Files in public directory:" ls -la public # Exit successfully exit 0 ``` -------------------------------------------------------------------------------- /src/mcp_server_hubspot/__init__.py: -------------------------------------------------------------------------------- ```python import argparse import asyncio import logging from . import server logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('mcp_hubspot') def main(): logger.debug("Starting mcp-server-hubspot main()") parser = argparse.ArgumentParser(description='HubSpot MCP Server') parser.add_argument('--access-token', help='HubSpot access token') args = parser.parse_args() logger.debug(f"Access token from args: {args.access_token}") # Run the async main function logger.debug("About to run server.main()") asyncio.run(server.main(args.access_token)) logger.debug("Server main() completed") if __name__ == "__main__": main() # Expose important items at package level __all__ = ["main", "server"] ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/deployments startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. # This MCP server requires the following HubSpot scopes: # Required: oauth # Optional: crm.dealsplits.read_write crm.objects.companies.read crm.objects.companies.write # crm.objects.contacts.read crm.objects.contacts.write crm.objects.deals.read type: object required: - hubspotAccessToken properties: hubspotAccessToken: type: string description: The access token for the HubSpot API. commandFunction: # A function that produces the CLI command to start the MCP on stdio. |- config => ({ command: 'docker', args: ['run', '--rm', '-e', `HUBSPOT_ACCESS_TOKEN=${config.hubspotAccessToken}`, 'buryhuang/mcp-hubspot:latest'] }) ``` -------------------------------------------------------------------------------- /src/mcp_server_hubspot/debug.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # Debug script to test Python environment in Replit import sys import os import json import traceback def main(): # Print debug information about the environment debug_info = { "python_version": sys.version, "python_path": sys.executable, "cwd": os.getcwd(), "env_vars": {k: v for k, v in os.environ.items() if not k.startswith("_")}, "sys_path": sys.path, "arguments": sys.argv } # Try to import important modules try: import hubspot debug_info["hubspot_version"] = hubspot.__version__ except Exception as e: debug_info["hubspot_import_error"] = str(e) debug_info["hubspot_traceback"] = traceback.format_exc() try: from mcp import server debug_info["mcp_found"] = True except Exception as e: debug_info["mcp_import_error"] = str(e) debug_info["mcp_traceback"] = traceback.format_exc() print(json.dumps(debug_info, indent=2, default=str)) if __name__ == "__main__": try: main() except Exception as e: error_info = { "error": str(e), "traceback": traceback.format_exc() } print(json.dumps(error_info, indent=2)) ``` -------------------------------------------------------------------------------- /public/index.html: -------------------------------------------------------------------------------- ```html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>HubSpot MCP Server</title> <style> body { font-family: system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; line-height: 1.6; } h1 { color: #333; border-bottom: 1px solid #eee; padding-bottom: 10px; } pre { background-color: #f5f5f5; padding: 10px; border-radius: 5px; overflow-x: auto; } code { font-family: 'Courier New', Courier, monospace; } </style> </head> <body> <h1>HubSpot MCP Server</h1> <p>This is a Machine Communication Protocol (MCP) server for HubSpot integration.</p> <h2>API Endpoints</h2> <ul> <li><code>/ping</code> - Health check endpoint</li> <li><code>/echo</code> - Debug endpoint that echoes back the request body</li> <li><code>/</code> - Main endpoint for MCP requests</li> </ul> <h2>Authentication</h2> <p>All requests must include a HubSpot access token in one of the following ways:</p> <ul> <li>Header: <code>X-HubSpot-Access-Token: your-token-here</code></li> <li>Request body: <code>{"accessToken": "your-token-here"}</code></li> <li>Request body: <code>{"hubspotAccessToken": "your-token-here"}</code></li> </ul> <h2>Example Request</h2> <pre><code>curl -X POST https://your-vercel-domain.vercel.app/ \ -H "Content-Type: application/json" \ -H "X-HubSpot-Access-Token: your-token-here" \ -d '{"action": "get_contacts"}' </code></pre> </body> </html> ``` -------------------------------------------------------------------------------- /api/index.js: -------------------------------------------------------------------------------- ```javascript // API handler for both Vercel serverless and Replit const { spawn } = require('child_process'); const path = require('path'); const fs = require('fs'); // Detect if running on Vercel or Replit const isVercel = process.env.VERCEL === '1'; // Express middleware style handler for Replit const handleRequest = async (req, res) => { // CORS headers res.setHeader('Access-Control-Allow-Credentials', true); res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET,OPTIONS,POST'); res.setHeader('Access-Control-Allow-Headers', 'X-CSRF-Token, X-Requested-With, Accept, Accept-Version, Content-Length, Content-MD5, Content-Type, Date, X-Api-Version, X-HubSpot-Access-Token'); // Handle OPTIONS request (preflight) if (req.method === 'OPTIONS') { res.status(200).end(); return; } // Health check endpoint if (req.method === 'GET' && (req.url === '/health' || req.path === '/health')) { return res.status(200).json({ status: 'ok' }); } // Test endpoint if (req.method === 'GET' && (req.url === '/ping' || req.path === '/ping')) { return res.status(200).send('pong'); } // Debug endpoint if (req.method === 'GET' && (req.url === '/debug' || req.path === '/debug')) { const env = process.env; const pythonInfo = {}; try { const pythonVersionCmd = spawn('python', ['--version']); let pythonVersion = ''; pythonVersionCmd.stdout.on('data', (data) => { pythonVersion += data.toString(); }); await new Promise((resolve) => { pythonVersionCmd.on('close', (code) => { pythonInfo.versionExitCode = code; pythonInfo.version = pythonVersion.trim(); resolve(); }); }); } catch (e) { pythonInfo.error = e.message; } return res.status(200).json({ nodeVersion: process.version, cwd: process.cwd(), environment: isVercel ? 'Vercel' : 'Replit/Other', pythonInfo, serverPath: path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'), serverExists: fs.existsSync(path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py')) }); } // Main endpoint for MCP requests if (req.method === 'POST') { try { // Get the request body const body = typeof req.body === 'string' ? JSON.parse(req.body) : req.body; // Check for access token in header first, then in body const accessToken = req.headers['x-hubspot-access-token'] || body.accessToken || body.hubspotAccessToken; if (!accessToken) { return res.status(400).json({ error: 'Missing access token', message: 'HubSpot access token must be provided in either the X-HubSpot-Access-Token header or in the request body as accessToken or hubspotAccessToken' }); } // Simple echo for debugging if (req.url === '/echo' || req.path === '/echo') { return res.status(200).json({ receivedBody: body, receivedToken: accessToken ? '[PRESENT]' : '[MISSING]' }); } // Create public directory if it doesn't exist const publicDir = path.join(process.cwd(), 'public'); if (!fs.existsSync(publicDir)) { fs.mkdirSync(publicDir, { recursive: true }); } // Use mock data on Vercel, real data elsewhere if (isVercel) { // Mock responses for Vercel if (body.action === 'get_contacts') { return res.status(200).json({ status: 'success', message: 'Mock contacts data', data: [ { id: '1', firstName: 'John', lastName: 'Doe', email: '[email protected]' }, { id: '2', firstName: 'Jane', lastName: 'Smith', email: '[email protected]' } ] }); } else if (body.action === 'get_companies') { return res.status(200).json({ status: 'success', message: 'Mock companies data', data: [ { id: '101', name: 'Acme Corp', domain: 'acme.com' }, { id: '102', name: 'Globex', domain: 'globex.com' } ] }); } else { // For any other action, return a generic success response return res.status(200).json({ status: 'success', message: `Mock response for action: ${body.action}`, tokenValid: true }); } } else { // Real Python execution (works on Replit but not Vercel) // Get the path to the MCP server script const scriptPath = path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'); // Spawn a Python process to run the MCP server, passing the access token as an argument const pythonProcess = spawn('python', [scriptPath, accessToken]); // Set up response buffers let responseData = ''; let errorData = ''; // Send the input to the Python process pythonProcess.stdin.write(JSON.stringify(body) + '\n'); pythonProcess.stdin.end(); // Listen for data from the Python process pythonProcess.stdout.on('data', (data) => { responseData += data.toString(); }); // Listen for errors pythonProcess.stderr.on('data', (data) => { errorData += data.toString(); }); // Wait for the process to exit await new Promise((resolve) => { pythonProcess.on('close', (code) => { resolve(code); }); }); // If there was an error, return it if (errorData) { console.error('Python error:', errorData); return res.status(500).json({ error: 'Internal server error', details: errorData }); } // Try to parse the response as JSON try { const jsonResponse = JSON.parse(responseData); return res.status(200).json(jsonResponse); } catch (e) { // If the response isn't valid JSON, just return it as text return res.status(200).send(responseData); } } } catch (error) { console.error('Error processing request:', error); return res.status(500).json({ error: 'Internal server error', details: error.message }); } } // If we get here, the method is not supported return res.status(405).json({ error: 'Method not allowed' }); }; // Export for both Express and Vercel module.exports = handleRequest; ``` -------------------------------------------------------------------------------- /server.js: -------------------------------------------------------------------------------- ```javascript const express = require('express'); const { spawn } = require('child_process'); const path = require('path'); const fs = require('fs'); const app = express(); const PORT = process.env.PORT || 3000; // Middleware to parse JSON bodies app.use(express.json()); // Serve static files from public directory app.use(express.static('public')); // CORS headers app.use((req, res, next) => { res.setHeader('Access-Control-Allow-Credentials', true); res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET,OPTIONS,POST'); res.setHeader('Access-Control-Allow-Headers', 'X-CSRF-Token, X-Requested-With, Accept, Accept-Version, Content-Length, Content-MD5, Content-Type, Date, X-Api-Version, X-HubSpot-Access-Token'); if (req.method === 'OPTIONS') { return res.status(200).end(); } next(); }); // Handle MCP protocol messages const handleMcpRequest = async (req, res) => { const body = req.body; console.log('Received MCP request:', JSON.stringify(body)); // Extract access token const accessToken = req.headers['x-hubspot-access-token'] || body.accessToken || body.hubspotAccessToken; if (!accessToken) { return res.status(400).json({ error: 'Missing access token', message: 'HubSpot access token must be provided in either the X-HubSpot-Access-Token header or in the request body' }); } // Basic MCP initialization response if (body.mcp === true || body.action === 'initialize') { return res.status(200).json({ mcp: true, version: '1.0.0', name: 'HubSpot MCP Server', status: 'ready' }); } try { // Get the path to the MCP server script const scriptPath = path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'); console.log('Script path:', scriptPath); console.log('Script exists:', fs.existsSync(scriptPath)); // Spawn a Python process to run the MCP server const pythonProcess = spawn('python', [scriptPath, accessToken]); let responseData = ''; let errorData = ''; // Send the input to the Python process pythonProcess.stdin.write(JSON.stringify(body) + '\n'); pythonProcess.stdin.end(); // Listen for data from the Python process pythonProcess.stdout.on('data', (data) => { console.log('Python output:', data.toString()); responseData += data.toString(); }); // Listen for errors pythonProcess.stderr.on('data', (data) => { console.error('Python error:', data.toString()); errorData += data.toString(); }); // Wait for the process to exit const exitCode = await new Promise((resolve) => { pythonProcess.on('close', (code) => { console.log('Python process exited with code:', code); resolve(code); }); }); if (exitCode !== 0 || errorData) { console.error('Error output:', errorData); if (exitCode === 127) { // Command not found return res.status(500).json({ error: 'Python execution failed', details: 'Python command not found. Please make sure Python is installed.', errorOutput: errorData }); } return res.status(500).json({ error: 'Internal server error', details: errorData || `Process exited with code ${exitCode}` }); } // Try to parse the response as JSON try { const jsonResponse = JSON.parse(responseData); return res.status(200).json(jsonResponse); } catch (e) { // If we can't parse as JSON, just return the raw output console.log('Could not parse Python output as JSON:', e.message); return res.status(200).send(responseData || 'No output from Python script'); } } catch (error) { console.error('Error processing request:', error); return res.status(500).json({ error: 'Internal server error', details: error.message }); } }; // Health check endpoint app.get('/health', (req, res) => { res.status(200).json({ status: 'ok' }); }); // Test endpoint app.get('/ping', (req, res) => { res.status(200).send('pong'); }); // Debug endpoint app.get('/debug', async (req, res) => { const pythonInfo = {}; try { const pythonVersionCmd = spawn('python', ['--version']); let pythonVersion = ''; pythonVersionCmd.stdout.on('data', (data) => { pythonVersion += data.toString(); }); await new Promise((resolve) => { pythonVersionCmd.on('close', (code) => { pythonInfo.versionExitCode = code; pythonInfo.version = pythonVersion.trim(); resolve(); }); }); } catch (e) { pythonInfo.error = e.message; } res.status(200).json({ nodeVersion: process.version, cwd: process.cwd(), pythonInfo, serverPath: path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'), serverExists: fs.existsSync(path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py')), env: process.env.NODE_ENV || 'development' }); }); // Debug Python environment endpoint app.get('/debug-python', async (req, res) => { try { // Get the path to the debug script const scriptPath = path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'debug.py'); console.log('Debug script path:', scriptPath); console.log('Debug script exists:', fs.existsSync(scriptPath)); // Spawn a Python process to run the debug script const pythonProcess = spawn('python', [scriptPath]); let responseData = ''; let errorData = ''; // Listen for data from the Python process pythonProcess.stdout.on('data', (data) => { console.log('Python debug output:', data.toString()); responseData += data.toString(); }); // Listen for errors pythonProcess.stderr.on('data', (data) => { console.error('Python debug error:', data.toString()); errorData += data.toString(); }); // Wait for the process to exit const exitCode = await new Promise((resolve) => { pythonProcess.on('close', (code) => { console.log('Python debug process exited with code:', code); resolve(code); }); }); res.status(200).json({ pythonOutput: responseData, pythonError: errorData, exitCode }); } catch (error) { console.error('Error in debug-python endpoint:', error); res.status(500).json({ error: 'Internal server error', details: error.message }); } }); // Echo endpoint for debugging app.post('/echo', (req, res) => { const accessToken = req.headers['x-hubspot-access-token'] || req.body.accessToken || req.body.hubspotAccessToken; res.status(200).json({ receivedBody: req.body, receivedToken: accessToken ? '[PRESENT]' : '[MISSING]' }); }); // Main MCP endpoint app.post('/', handleMcpRequest); // Start the server app.listen(PORT, () => { console.log(`Server running on port ${PORT}`); }); ``` -------------------------------------------------------------------------------- /src/mcp_server_hubspot/server.py: -------------------------------------------------------------------------------- ```python import logging from typing import Any, Dict, List, Optional import os from dotenv import load_dotenv from hubspot import HubSpot from hubspot.crm.contacts import SimplePublicObjectInputForCreate from hubspot.crm.contacts.exceptions import ApiException from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio from pydantic import AnyUrl import json from datetime import datetime, timedelta from dateutil.tz import tzlocal logger = logging.getLogger('mcp_hubspot_server') def convert_datetime_fields(obj: Any) -> Any: """Convert any datetime or tzlocal objects to string in the given object""" if isinstance(obj, dict): return {k: convert_datetime_fields(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_datetime_fields(item) for item in obj] elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, tzlocal): # Get the current timezone offset offset = datetime.now(tzlocal()).strftime('%z') return f"UTC{offset[:3]}:{offset[3:]}" # Format like "UTC+08:00" or "UTC-05:00" return obj class HubSpotClient: def __init__(self, access_token: str): logger.debug(f"Using access token: {'[MASKED]' if access_token else 'None'}") if not access_token: raise ValueError("HubSpot access token is required. It must be provided as an argument.") # Initialize HubSpot client with the provided token # This allows for multi-user support by passing user-specific tokens self.client = HubSpot(access_token=access_token) def get_contacts(self) -> str: """Get all contacts from HubSpot (requires optional crm.objects.contacts.read scope)""" try: contacts = self.client.crm.contacts.get_all() contacts_dict = [contact.to_dict() for contact in contacts] converted_contacts = convert_datetime_fields(contacts_dict) return json.dumps(converted_contacts) except ApiException as e: logger.error(f"API Exception in get_contacts: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_contacts: {str(e)}") return json.dumps({"error": str(e)}) def get_companies(self) -> str: """Get all companies from HubSpot (requires optional crm.objects.companies.read scope)""" try: companies = self.client.crm.companies.get_all() companies_dict = [company.to_dict() for company in companies] converted_companies = convert_datetime_fields(companies_dict) return json.dumps(converted_companies) except ApiException as e: logger.error(f"API Exception in get_companies: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_companies: {str(e)}") return json.dumps({"error": str(e)}) def get_company_activity(self, company_id: str) -> str: """Get activity history for a specific company (requires optional crm.objects.companies.read scope)""" try: # Note: This method only requires standard read scopes, not sensitive scopes # Step 1: Get all engagement IDs associated with the company using CRM Associations v4 API associated_engagements = self.client.crm.associations.v4.basic_api.get_page( object_type="companies", object_id=company_id, to_object_type="engagements", limit=500 ) # Extract engagement IDs from the associations response engagement_ids = [] if hasattr(associated_engagements, 'results'): for result in associated_engagements.results: engagement_ids.append(result.to_object_id) # Step 2: Get detailed information for each engagement activities = [] for engagement_id in engagement_ids: engagement_response = self.client.api_request({ "method": "GET", "path": f"/engagements/v1/engagements/{engagement_id}" }).json() engagement_data = engagement_response.get('engagement', {}) metadata = engagement_response.get('metadata', {}) # Format the engagement formatted_engagement = { "id": engagement_data.get("id"), "type": engagement_data.get("type"), "created_at": engagement_data.get("createdAt"), "last_updated": engagement_data.get("lastUpdated"), "created_by": engagement_data.get("createdBy"), "modified_by": engagement_data.get("modifiedBy"), "timestamp": engagement_data.get("timestamp"), "associations": engagement_response.get("associations", {}) } # Add type-specific metadata formatting if engagement_data.get("type") == "NOTE": formatted_engagement["content"] = metadata.get("body", "") elif engagement_data.get("type") == "EMAIL": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "from": { "raw": metadata.get("from", {}).get("raw", ""), "email": metadata.get("from", {}).get("email", ""), "firstName": metadata.get("from", {}).get("firstName", ""), "lastName": metadata.get("from", {}).get("lastName", "") }, "to": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("to", [])], "cc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("cc", [])], "bcc": [{ "raw": recipient.get("raw", ""), "email": recipient.get("email", ""), "firstName": recipient.get("firstName", ""), "lastName": recipient.get("lastName", "") } for recipient in metadata.get("bcc", [])], "sender": { "email": metadata.get("sender", {}).get("email", "") }, "body": metadata.get("text", "") or metadata.get("html", "") } elif engagement_data.get("type") == "TASK": formatted_engagement["content"] = { "subject": metadata.get("subject", ""), "body": metadata.get("body", ""), "status": metadata.get("status", ""), "for_object_type": metadata.get("forObjectType", "") } elif engagement_data.get("type") == "MEETING": formatted_engagement["content"] = { "title": metadata.get("title", ""), "body": metadata.get("body", ""), "start_time": metadata.get("startTime"), "end_time": metadata.get("endTime"), "internal_notes": metadata.get("internalMeetingNotes", "") } elif engagement_data.get("type") == "CALL": formatted_engagement["content"] = { "body": metadata.get("body", ""), "from_number": metadata.get("fromNumber", ""), "to_number": metadata.get("toNumber", ""), "duration_ms": metadata.get("durationMilliseconds"), "status": metadata.get("status", ""), "disposition": metadata.get("disposition", "") } activities.append(formatted_engagement) # Convert any datetime fields and return converted_activities = convert_datetime_fields(activities) return json.dumps(converted_activities) except ApiException as e: logger.error(f"API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception: {str(e)}") return json.dumps({"error": str(e)}) async def main(access_token: str): """ Run the HubSpot MCP server. Args: access_token: HubSpot access token Note: This server requires the following HubSpot scopes: Required: - oauth Optional: - crm.dealsplits.read_write - crm.objects.companies.read - crm.objects.companies.write - crm.objects.contacts.read - crm.objects.contacts.write - crm.objects.deals.read """ logger.info("Server starting") hubspot = HubSpotClient(access_token) server = Server("hubspot-manager") @server.list_resources() async def handle_list_resources() -> list[types.Resource]: return [ types.Resource( uri=AnyUrl("hubspot://hubspot_contacts"), name="HubSpot Contacts", description="List of HubSpot contacts (requires optional crm.objects.contacts.read scope)", mimeType="application/json", ), types.Resource( uri=AnyUrl("hubspot://hubspot_companies"), name="HubSpot Companies", description="List of HubSpot companies (requires optional crm.objects.companies.read scope)", mimeType="application/json", ), ] @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: if uri.scheme != "hubspot": raise ValueError(f"Unsupported URI scheme: {uri.scheme}") path = str(uri).replace("hubspot://", "") if path == "hubspot_contacts": return str(hubspot.get_contacts()) elif path == "hubspot_companies": return str(hubspot.get_companies()) else: raise ValueError(f"Unknown resource path: {path}") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools""" return [ types.Tool( name="hubspot_get_contacts", description="Get contacts from HubSpot (requires optional crm.objects.contacts.read scope)", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="hubspot_create_contact", description="Create a new contact in HubSpot (requires optional crm.objects.contacts.write scope)", inputSchema={ "type": "object", "properties": { "firstname": {"type": "string", "description": "Contact's first name"}, "lastname": {"type": "string", "description": "Contact's last name"}, "email": {"type": "string", "description": "Contact's email address"}, "properties": {"type": "object", "description": "Additional contact properties"} }, "required": ["firstname", "lastname"] }, ), types.Tool( name="hubspot_get_companies", description="Get companies from HubSpot (requires optional crm.objects.companies.read scope)", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="hubspot_create_company", description="Create a new company in HubSpot (requires optional crm.objects.companies.write scope)", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "Company name"}, "properties": {"type": "object", "description": "Additional company properties"} }, "required": ["name"] }, ), types.Tool( name="hubspot_get_company_activity", description="Get activity history for a specific company (requires optional crm.objects.companies.read scope)", inputSchema={ "type": "object", "properties": { "company_id": {"type": "string", "description": "HubSpot company ID"} }, "required": ["company_id"] }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict[str, Any] | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool execution requests""" try: if name == "hubspot_get_contacts": results = hubspot.get_contacts() return [types.TextContent(type="text", text=str(results))] elif name == "hubspot_create_contact": if not arguments: raise ValueError("Missing arguments for create_contact") firstname = arguments["firstname"] lastname = arguments["lastname"] company = arguments.get("properties", {}).get("company") # Search for existing contacts with same name and company try: from hubspot.crm.contacts import PublicObjectSearchRequest filter_groups = [{ "filters": [ { "propertyName": "firstname", "operator": "EQ", "value": firstname }, { "propertyName": "lastname", "operator": "EQ", "value": lastname } ] }] # Add company filter if provided if company: filter_groups[0]["filters"].append({ "propertyName": "company", "operator": "EQ", "value": company }) search_request = PublicObjectSearchRequest( filter_groups=filter_groups ) search_response = hubspot.client.crm.contacts.search_api.do_search( public_object_search_request=search_request ) if search_response.total > 0: # Contact already exists return [types.TextContent( type="text", text=f"Contact already exists: {search_response.results[0].to_dict()}" )] # If no existing contact found, proceed with creation properties = { "firstname": firstname, "lastname": lastname } # Add email if provided if "email" in arguments: properties["email"] = arguments["email"] # Add any additional properties if "properties" in arguments: properties.update(arguments["properties"]) # Create contact using SimplePublicObjectInputForCreate simple_public_object_input = SimplePublicObjectInputForCreate( properties=properties ) api_response = hubspot.client.crm.contacts.basic_api.create( simple_public_object_input_for_create=simple_public_object_input ) return [types.TextContent(type="text", text=str(api_response.to_dict()))] except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] elif name == "hubspot_get_companies": results = hubspot.get_companies() return [types.TextContent(type="text", text=str(results))] elif name == "hubspot_create_company": if not arguments: raise ValueError("Missing arguments for create_company") company_name = arguments["name"] # Search for existing companies with same name try: from hubspot.crm.companies import PublicObjectSearchRequest search_request = PublicObjectSearchRequest( filter_groups=[{ "filters": [ { "propertyName": "name", "operator": "EQ", "value": company_name } ] }] ) search_response = hubspot.client.crm.companies.search_api.do_search( public_object_search_request=search_request ) if search_response.total > 0: # Company already exists return [types.TextContent( type="text", text=f"Company already exists: {search_response.results[0].to_dict()}" )] # If no existing company found, proceed with creation properties = { "name": company_name } # Add any additional properties if "properties" in arguments: properties.update(arguments["properties"]) # Create company using SimplePublicObjectInputForCreate simple_public_object_input = SimplePublicObjectInputForCreate( properties=properties ) api_response = hubspot.client.crm.companies.basic_api.create( simple_public_object_input_for_create=simple_public_object_input ) return [types.TextContent(type="text", text=str(api_response.to_dict()))] except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] elif name == "hubspot_get_company_activity": if not arguments: raise ValueError("Missing arguments for get_company_activity") results = hubspot.get_company_activity(arguments["company_id"]) return [types.TextContent(type="text", text=results)] else: raise ValueError(f"Unknown tool: {name}") except ApiException as e: return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("Server running with stdio transport") await server.run( read_stream, write_stream, InitializationOptions( server_name="hubspot", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": import asyncio import sys if len(sys.argv) != 2: print("Usage: python server.py <access_token>") sys.exit(1) asyncio.run(main(sys.argv[1])) ```