#
tokens: 11117/50000 16/16 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .replit
├── api
│   └── index.js
├── build.sh
├── Dockerfile
├── LICENSE
├── package.json
├── public
│   └── index.html
├── pyproject.toml
├── README.md
├── replit.nix
├── requirements.txt
├── runtime.txt
├── server.js
├── smithery.yaml
├── src
│   └── mcp_server_hubspot
│       ├── __init__.py
│       ├── debug.py
│       └── server.py
└── vercel.json
```

# Files

--------------------------------------------------------------------------------
/.replit:
--------------------------------------------------------------------------------

```
run = "npm start"
entrypoint = "server.js"
language = "nodejs"

[nix]
channel = "stable-22_11"

[env]
PORT = "3000"

[packager]
language = "nodejs"

[packager.features]
packageSearch = true
guessImports = true

[languages.js]
pattern = "**/*.js"
syntax = "javascript"

[languages.js.languageServer]
start = [ "typescript-language-server", "--stdio" ]

[deployment]
run = ["sh", "-c", "npm start"]
deploymentTarget = "cloudrun"

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# HubSpot MCP Server
[![Docker Hub](https://img.shields.io/docker/v/buryhuang/mcp-hubspot?label=Docker%20Hub)](https://hub.docker.com/r/buryhuang/mcp-hubspot) [![smithery badge](https://smithery.ai/badge/mcp-hubspot)](https://smithery.ai/server/mcp-hubspot/prod)

## Overview

A Model Context Protocol (MCP) server implementation that provides integration with HubSpot CRM. This server enables AI models to interact with HubSpot data and operations through a standardized interface.

For more information about the Model Context Protocol and how it works, see [Anthropic's MCP documentation](https://www.anthropic.com/news/model-context-protocol).

<a href="https://glama.ai/mcp/servers/vpoifk4jai"><img width="380" height="200" src="https://glama.ai/mcp/servers/vpoifk4jai/badge" alt="HubSpot Server MCP server" /></a>

## Components

### Resources

The server exposes the following resources:

* `hubspot://hubspot_contacts`: A dynamic resource that provides access to HubSpot contacts
* `hubspot://hubspot_companies`: A dynamic resource that provides access to HubSpot companies
* `hubspot://hubspot_recent_engagements`: A dynamic resource that provides access to HubSpot engagements from the last 3 days

All resources auto-update as their respective objects are modified in HubSpot.

### Example Prompts

- Create Hubspot contacts by copying from LinkedIn profile webpage: 
    ```
    Create HubSpot contacts and companies from following:

    John Doe
    Software Engineer at Tech Corp
    San Francisco Bay Area • 500+ connections
    
    Experience
    Tech Corp
    Software Engineer
    Jan 2020 - Present · 4 yrs
    San Francisco, California
    
    Previous Company Inc.
    Senior Developer
    2018 - 2020 · 2 yrs
    
    Education
    University of California, Berkeley
    Computer Science, BS
    2014 - 2018
    ```

- Get latest activities for your company:
    ```
    What's happening latestly with my pipeline?
    ```



### Tools

The server offers several tools for managing HubSpot objects:

#### Contact Management Tools
* `hubspot_get_contacts`
  * Retrieve contacts from HubSpot
  * No input required
  * Returns: Array of contact objects

* `hubspot_create_contact`
  * Create a new contact in HubSpot (checks for duplicates before creation)
  * Input:
    * `firstname` (string): Contact's first name
    * `lastname` (string): Contact's last name
    * `email` (string, optional): Contact's email address
    * `properties` (dict, optional): Additional contact properties
      * Example: `{"phone": "123456789", "company": "HubSpot"}`
  * Behavior:
    * Checks for existing contacts with the same first name and last name
    * If `company` is provided in properties, also checks for matches with the same company
    * Returns existing contact details if a match is found
    * Creates new contact only if no match is found

#### Company Management Tools
* `hubspot_get_companies`
  * Retrieve companies from HubSpot
  * No input required
  * Returns: Array of company objects

* `hubspot_create_company`
  * Create a new company in HubSpot (checks for duplicates before creation)
  * Input:
    * `name` (string): Company name
    * `properties` (dict, optional): Additional company properties
      * Example: `{"domain": "example.com", "industry": "Technology"}`
  * Behavior:
    * Checks for existing companies with the same name
    * Returns existing company details if a match is found
    * Creates new company only if no match is found

* `hubspot_get_company_activity`
  * Get activity history for a specific company
  * Input:
    * `company_id` (string): HubSpot company ID
  * Returns: Array of activity objects

#### Engagement Tools
* `hubspot_get_recent_engagements`
  * Get HubSpot engagements from all companies and contacts from the last 3 days
  * No input required
  * Returns: Array of engagement objects with full metadata


## Multi-User Support

This MCP server is designed to work with multiple HubSpot users, each with their own access token. The server does not use a global environment variable for the access token.

Instead, each request to the MCP server should include the user's specific access token in one of the following ways:

1. In the request header: `X-HubSpot-Access-Token: your-token-here`
2. In the request body as `accessToken`: `{"accessToken": "your-token-here"}`
3. In the request body as `hubspotAccessToken`: `{"hubspotAccessToken": "your-token-here"}`

This design allows you to store user tokens in your own backend (e.g., Supabase) and pass them along with each request.

### Example Multi-User Integration

```javascript
// Example of how to use this MCP server in a multi-user setup
async function makeHubSpotRequest(userId, action, params) {
  // Retrieve the user's HubSpot token from your database
  const userToken = await getUserHubSpotToken(userId); 

  // Make request to MCP server with the user's token
  const response = await fetch('https://your-mcp-server.vercel.app/', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-HubSpot-Access-Token': userToken
    },
    body: JSON.stringify({
      action,
      ...params
    })
  });
  
  return await response.json();
}
```

## Setup

### Prerequisites

You'll need a HubSpot access token for each user. You can obtain this by:
1. Creating a private app in your HubSpot account:
   Follow the [HubSpot Private Apps Guide](https://developers.hubspot.com/docs/guides/apps/private-apps/overview)
   - Go to your HubSpot account settings
   - Navigate to Integrations > Private Apps
   - Click "Create private app"
   - Fill in the basic information:
     - Name your app
     - Add description
     - Upload logo (optional)
   - Define required scopes:
     - oauth (required)
     
   - Optional scopes:
     - crm.dealsplits.read_write
     - crm.objects.companies.read
     - crm.objects.companies.write
     - crm.objects.contacts.read
     - crm.objects.contacts.write
     - crm.objects.deals.read
   - Review and create the app
   - Copy the generated access token

Note: Keep your access token secure and never commit it to version control.

### Docker Installation

You can either build the image locally or pull it from Docker Hub. The image is built for the Linux platform.

#### Supported Platforms
- Linux/amd64
- Linux/arm64
- Linux/arm/v7

#### Option 1: Pull from Docker Hub
```bash
docker pull buryhuang/mcp-hubspot:latest
```

#### Option 2: Build Locally
```bash
docker build -t mcp-hubspot .
```

Run the container:
```bash
docker run \
  buryhuang/mcp-hubspot:latest
```

## Cross-Platform Publishing

To publish the Docker image for multiple platforms, you can use the `docker buildx` command. Follow these steps:

1. **Create a new builder instance** (if you haven't already):
   ```bash
   docker buildx create --use
   ```

2. **Build and push the image for multiple platforms**:
   ```bash
   docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t buryhuang/mcp-hubspot:latest --push .
   ```

3. **Verify the image is available for the specified platforms**:
   ```bash
   docker buildx imagetools inspect buryhuang/mcp-hubspot:latest
   ```


## Usage with Claude Desktop

### Installing via Smithery

To install mcp-hubspot for Claude Desktop automatically via [Smithery](https://smithery.ai/server/mcp-hubspot/prod):

```bash
npx -y @smithery/cli@latest install mcp-hubspot --client claude
```

### Docker Usage
```json
{
  "mcpServers": {
    "hubspot": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "buryhuang/mcp-hubspot:latest"
      ]
    }
  }
}
```

## Development

To set up the development environment:

```bash
pip install -e .
```

## License

This project is licensed under the MIT License. 

```

--------------------------------------------------------------------------------
/runtime.txt:
--------------------------------------------------------------------------------

```
python-3.9

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
hubspot-api-client>=8.0.0
python-dotenv>=1.0.0
mcp>=0.0.1
pydantic>=2.0.0
python-dateutil>=2.8.2

```

--------------------------------------------------------------------------------
/vercel.json:
--------------------------------------------------------------------------------

```json
{
  "version": 2,
  "functions": {
    "api/index.js": {
      "memory": 1024,
      "maxDuration": 10
    }
  },
  "routes": [
    { "src": "/(.*)", "dest": "/api/index.js" }
  ]
}

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Use Python base image
FROM python:3.10-slim-bookworm

# Install the project into `/app`
WORKDIR /app

# Copy the entire project
COPY . /app

# Install the package
RUN pip install --no-cache-dir .

# Run the server
ENTRYPOINT ["mcp-server-hubspot"] 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-hubspot"
version = "0.1.0"
description = "A simple Hubspot MCP server"
readme = "README.md"
requires-python = ">=3.10"
dependencies = ["mcp>=1.0.0", "hubspot-api-client>=8.1.0", "python-dotenv>=1.0.0"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv]
dev-dependencies = ["pyright>=1.1.389"]

[project.scripts]
mcp-server-hubspot = "mcp_server_hubspot:main" 
```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "hubspot-mcp",
  "version": "1.0.0",
  "description": "HubSpot MCP Server for Windsurf",
  "main": "server.js",
  "scripts": {
    "start": "node server.js",
    "install-python-deps": "pip install -r requirements.txt",
    "build": "chmod +x ./build.sh && ./build.sh",
    "postinstall": "npm run install-python-deps"
  },
  "engines": {
    "node": ">=14"
  },
  "dependencies": {
    "express": "^4.18.2"
  }
}

```

--------------------------------------------------------------------------------
/build.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Install Python dependencies
python -m pip install -r requirements.txt 2>/dev/null || pip3 install -r requirements.txt 2>/dev/null || echo "Warning: Failed to install Python dependencies"

# Create necessary directories
mkdir -p api

# Create the public directory that Vercel requires
mkdir -p public
touch public/.gitkeep

# Print debug info
echo "Build script executed successfully"
echo "Current directory: $(pwd)"
echo "Files in current directory:"
ls -la
echo "Files in public directory:"
ls -la public

# Exit successfully
exit 0

```

--------------------------------------------------------------------------------
/src/mcp_server_hubspot/__init__.py:
--------------------------------------------------------------------------------

```python
import argparse
import asyncio
import logging
from . import server

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('mcp_hubspot')

def main():
    logger.debug("Starting mcp-server-hubspot main()")
    parser = argparse.ArgumentParser(description='HubSpot MCP Server')
    parser.add_argument('--access-token', help='HubSpot access token')
    args = parser.parse_args()
    
    logger.debug(f"Access token from args: {args.access_token}")
    # Run the async main function
    logger.debug("About to run server.main()")
    asyncio.run(server.main(args.access_token))
    logger.debug("Server main() completed")

if __name__ == "__main__":
    main()

# Expose important items at package level
__all__ = ["main", "server"] 
```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/deployments

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    # This MCP server requires the following HubSpot scopes:
    # Required: oauth
    # Optional: crm.dealsplits.read_write crm.objects.companies.read crm.objects.companies.write 
    #          crm.objects.contacts.read crm.objects.contacts.write crm.objects.deals.read
    type: object
    required:
      - hubspotAccessToken
    properties:
      hubspotAccessToken:
        type: string
        description: The access token for the HubSpot API.
  commandFunction:
    # A function that produces the CLI command to start the MCP on stdio.
    |-
    config => ({ command: 'docker', args: ['run', '--rm', '-e', `HUBSPOT_ACCESS_TOKEN=${config.hubspotAccessToken}`, 'buryhuang/mcp-hubspot:latest'] })
```

--------------------------------------------------------------------------------
/src/mcp_server_hubspot/debug.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# Debug script to test Python environment in Replit

import sys
import os
import json
import traceback

def main():
    # Print debug information about the environment
    debug_info = {
        "python_version": sys.version,
        "python_path": sys.executable,
        "cwd": os.getcwd(),
        "env_vars": {k: v for k, v in os.environ.items() if not k.startswith("_")},
        "sys_path": sys.path,
        "arguments": sys.argv
    }
    
    # Try to import important modules
    try:
        import hubspot
        debug_info["hubspot_version"] = hubspot.__version__
    except Exception as e:
        debug_info["hubspot_import_error"] = str(e)
        debug_info["hubspot_traceback"] = traceback.format_exc()
    
    try:
        from mcp import server
        debug_info["mcp_found"] = True
    except Exception as e:
        debug_info["mcp_import_error"] = str(e)
        debug_info["mcp_traceback"] = traceback.format_exc()
    
    print(json.dumps(debug_info, indent=2, default=str))

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        error_info = {
            "error": str(e),
            "traceback": traceback.format_exc()
        }
        print(json.dumps(error_info, indent=2))

```

--------------------------------------------------------------------------------
/public/index.html:
--------------------------------------------------------------------------------

```html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>HubSpot MCP Server</title>
    <style>
        body {
            font-family: system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
            line-height: 1.6;
        }
        h1 {
            color: #333;
            border-bottom: 1px solid #eee;
            padding-bottom: 10px;
        }
        pre {
            background-color: #f5f5f5;
            padding: 10px;
            border-radius: 5px;
            overflow-x: auto;
        }
        code {
            font-family: 'Courier New', Courier, monospace;
        }
    </style>
</head>
<body>
    <h1>HubSpot MCP Server</h1>
    <p>This is a Machine Communication Protocol (MCP) server for HubSpot integration.</p>
    
    <h2>API Endpoints</h2>
    <ul>
        <li><code>/ping</code> - Health check endpoint</li>
        <li><code>/echo</code> - Debug endpoint that echoes back the request body</li>
        <li><code>/</code> - Main endpoint for MCP requests</li>
    </ul>

    <h2>Authentication</h2>
    <p>All requests must include a HubSpot access token in one of the following ways:</p>
    <ul>
        <li>Header: <code>X-HubSpot-Access-Token: your-token-here</code></li>
        <li>Request body: <code>{"accessToken": "your-token-here"}</code></li>
        <li>Request body: <code>{"hubspotAccessToken": "your-token-here"}</code></li>
    </ul>

    <h2>Example Request</h2>
    <pre><code>curl -X POST https://your-vercel-domain.vercel.app/ \
    -H "Content-Type: application/json" \
    -H "X-HubSpot-Access-Token: your-token-here" \
    -d '{"action": "get_contacts"}'
</code></pre>
</body>
</html>

```

--------------------------------------------------------------------------------
/api/index.js:
--------------------------------------------------------------------------------

```javascript
// API handler for both Vercel serverless and Replit
const { spawn } = require('child_process');
const path = require('path');
const fs = require('fs');

// Detect if running on Vercel or Replit
const isVercel = process.env.VERCEL === '1';

// Express middleware style handler for Replit
const handleRequest = async (req, res) => {
  // CORS headers
  res.setHeader('Access-Control-Allow-Credentials', true);
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Methods', 'GET,OPTIONS,POST');
  res.setHeader('Access-Control-Allow-Headers', 'X-CSRF-Token, X-Requested-With, Accept, Accept-Version, Content-Length, Content-MD5, Content-Type, Date, X-Api-Version, X-HubSpot-Access-Token');

  // Handle OPTIONS request (preflight)
  if (req.method === 'OPTIONS') {
    res.status(200).end();
    return;
  }

  // Health check endpoint
  if (req.method === 'GET' && (req.url === '/health' || req.path === '/health')) {
    return res.status(200).json({ status: 'ok' });
  }

  // Test endpoint
  if (req.method === 'GET' && (req.url === '/ping' || req.path === '/ping')) {
    return res.status(200).send('pong');
  }

  // Debug endpoint
  if (req.method === 'GET' && (req.url === '/debug' || req.path === '/debug')) {
    const env = process.env;
    const pythonInfo = {};
    
    try {
      const pythonVersionCmd = spawn('python', ['--version']);
      let pythonVersion = '';
      
      pythonVersionCmd.stdout.on('data', (data) => {
        pythonVersion += data.toString();
      });
      
      await new Promise((resolve) => {
        pythonVersionCmd.on('close', (code) => {
          pythonInfo.versionExitCode = code;
          pythonInfo.version = pythonVersion.trim();
          resolve();
        });
      });
    } catch (e) {
      pythonInfo.error = e.message;
    }
    
    return res.status(200).json({
      nodeVersion: process.version,
      cwd: process.cwd(),
      environment: isVercel ? 'Vercel' : 'Replit/Other',
      pythonInfo,
      serverPath: path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'),
      serverExists: fs.existsSync(path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'))
    });
  }

  // Main endpoint for MCP requests
  if (req.method === 'POST') {
    try {
      // Get the request body
      const body = typeof req.body === 'string' ? JSON.parse(req.body) : req.body;
      
      // Check for access token in header first, then in body
      const accessToken = req.headers['x-hubspot-access-token'] || body.accessToken || body.hubspotAccessToken;
      if (!accessToken) {
        return res.status(400).json({
          error: 'Missing access token',
          message: 'HubSpot access token must be provided in either the X-HubSpot-Access-Token header or in the request body as accessToken or hubspotAccessToken'
        });
      }

      // Simple echo for debugging
      if (req.url === '/echo' || req.path === '/echo') {
        return res.status(200).json({
          receivedBody: body,
          receivedToken: accessToken ? '[PRESENT]' : '[MISSING]'
        });
      }

      // Create public directory if it doesn't exist
      const publicDir = path.join(process.cwd(), 'public');
      if (!fs.existsSync(publicDir)) {
        fs.mkdirSync(publicDir, { recursive: true });
      }

      // Use mock data on Vercel, real data elsewhere
      if (isVercel) {
        // Mock responses for Vercel
        if (body.action === 'get_contacts') {
          return res.status(200).json({
            status: 'success',
            message: 'Mock contacts data',
            data: [
              { id: '1', firstName: 'John', lastName: 'Doe', email: '[email protected]' },
              { id: '2', firstName: 'Jane', lastName: 'Smith', email: '[email protected]' }
            ]
          });
        } else if (body.action === 'get_companies') {
          return res.status(200).json({
            status: 'success',
            message: 'Mock companies data',
            data: [
              { id: '101', name: 'Acme Corp', domain: 'acme.com' },
              { id: '102', name: 'Globex', domain: 'globex.com' }
            ]
          });
        } else {
          // For any other action, return a generic success response
          return res.status(200).json({
            status: 'success',
            message: `Mock response for action: ${body.action}`,
            tokenValid: true
          });
        }
      } else {
        // Real Python execution (works on Replit but not Vercel)
        // Get the path to the MCP server script
        const scriptPath = path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py');

        // Spawn a Python process to run the MCP server, passing the access token as an argument
        const pythonProcess = spawn('python', [scriptPath, accessToken]);
        
        // Set up response buffers
        let responseData = '';
        let errorData = '';

        // Send the input to the Python process
        pythonProcess.stdin.write(JSON.stringify(body) + '\n');
        pythonProcess.stdin.end();

        // Listen for data from the Python process
        pythonProcess.stdout.on('data', (data) => {
          responseData += data.toString();
        });

        // Listen for errors
        pythonProcess.stderr.on('data', (data) => {
          errorData += data.toString();
        });

        // Wait for the process to exit
        await new Promise((resolve) => {
          pythonProcess.on('close', (code) => {
            resolve(code);
          });
        });

        // If there was an error, return it
        if (errorData) {
          console.error('Python error:', errorData);
          return res.status(500).json({ error: 'Internal server error', details: errorData });
        }

        // Try to parse the response as JSON
        try {
          const jsonResponse = JSON.parse(responseData);
          return res.status(200).json(jsonResponse);
        } catch (e) {
          // If the response isn't valid JSON, just return it as text
          return res.status(200).send(responseData);
        }
      }
    } catch (error) {
      console.error('Error processing request:', error);
      return res.status(500).json({ error: 'Internal server error', details: error.message });
    }
  }

  // If we get here, the method is not supported
  return res.status(405).json({ error: 'Method not allowed' });
};

// Export for both Express and Vercel
module.exports = handleRequest;

```

--------------------------------------------------------------------------------
/server.js:
--------------------------------------------------------------------------------

```javascript
const express = require('express');
const { spawn } = require('child_process');
const path = require('path');
const fs = require('fs');

const app = express();
const PORT = process.env.PORT || 3000;

// Middleware to parse JSON bodies
app.use(express.json());

// Serve static files from public directory
app.use(express.static('public'));

// CORS headers
app.use((req, res, next) => {
  res.setHeader('Access-Control-Allow-Credentials', true);
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Methods', 'GET,OPTIONS,POST');
  res.setHeader('Access-Control-Allow-Headers', 'X-CSRF-Token, X-Requested-With, Accept, Accept-Version, Content-Length, Content-MD5, Content-Type, Date, X-Api-Version, X-HubSpot-Access-Token');
  
  if (req.method === 'OPTIONS') {
    return res.status(200).end();
  }
  
  next();
});

// Handle MCP protocol messages
const handleMcpRequest = async (req, res) => {
  const body = req.body;
  console.log('Received MCP request:', JSON.stringify(body));
  
  // Extract access token
  const accessToken = req.headers['x-hubspot-access-token'] || body.accessToken || body.hubspotAccessToken;
  
  if (!accessToken) {
    return res.status(400).json({
      error: 'Missing access token',
      message: 'HubSpot access token must be provided in either the X-HubSpot-Access-Token header or in the request body'
    });
  }
  
  // Basic MCP initialization response
  if (body.mcp === true || body.action === 'initialize') {
    return res.status(200).json({
      mcp: true,
      version: '1.0.0',
      name: 'HubSpot MCP Server',
      status: 'ready'
    });
  }
  
  try {
    // Get the path to the MCP server script
    const scriptPath = path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py');
    console.log('Script path:', scriptPath);
    console.log('Script exists:', fs.existsSync(scriptPath));

    // Spawn a Python process to run the MCP server
    const pythonProcess = spawn('python', [scriptPath, accessToken]);
    
    let responseData = '';
    let errorData = '';

    // Send the input to the Python process
    pythonProcess.stdin.write(JSON.stringify(body) + '\n');
    pythonProcess.stdin.end();

    // Listen for data from the Python process
    pythonProcess.stdout.on('data', (data) => {
      console.log('Python output:', data.toString());
      responseData += data.toString();
    });

    // Listen for errors
    pythonProcess.stderr.on('data', (data) => {
      console.error('Python error:', data.toString());
      errorData += data.toString();
    });

    // Wait for the process to exit
    const exitCode = await new Promise((resolve) => {
      pythonProcess.on('close', (code) => {
        console.log('Python process exited with code:', code);
        resolve(code);
      });
    });

    if (exitCode !== 0 || errorData) {
      console.error('Error output:', errorData);
      if (exitCode === 127) { // Command not found
        return res.status(500).json({
          error: 'Python execution failed',
          details: 'Python command not found. Please make sure Python is installed.',
          errorOutput: errorData
        });
      }
      return res.status(500).json({
        error: 'Internal server error',
        details: errorData || `Process exited with code ${exitCode}`
      });
    }

    // Try to parse the response as JSON
    try {
      const jsonResponse = JSON.parse(responseData);
      return res.status(200).json(jsonResponse);
    } catch (e) {
      // If we can't parse as JSON, just return the raw output
      console.log('Could not parse Python output as JSON:', e.message);
      return res.status(200).send(responseData || 'No output from Python script');
    }
  } catch (error) {
    console.error('Error processing request:', error);
    return res.status(500).json({
      error: 'Internal server error',
      details: error.message
    });
  }
};

// Health check endpoint
app.get('/health', (req, res) => {
  res.status(200).json({ status: 'ok' });
});

// Test endpoint
app.get('/ping', (req, res) => {
  res.status(200).send('pong');
});

// Debug endpoint
app.get('/debug', async (req, res) => {
  const pythonInfo = {};
  
  try {
    const pythonVersionCmd = spawn('python', ['--version']);
    let pythonVersion = '';
    
    pythonVersionCmd.stdout.on('data', (data) => {
      pythonVersion += data.toString();
    });
    
    await new Promise((resolve) => {
      pythonVersionCmd.on('close', (code) => {
        pythonInfo.versionExitCode = code;
        pythonInfo.version = pythonVersion.trim();
        resolve();
      });
    });
  } catch (e) {
    pythonInfo.error = e.message;
  }
  
  res.status(200).json({
    nodeVersion: process.version,
    cwd: process.cwd(),
    pythonInfo,
    serverPath: path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py'),
    serverExists: fs.existsSync(path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'server.py')),
    env: process.env.NODE_ENV || 'development'
  });
});

// Debug Python environment endpoint
app.get('/debug-python', async (req, res) => {
  try {
    // Get the path to the debug script
    const scriptPath = path.join(process.cwd(), 'src', 'mcp_server_hubspot', 'debug.py');
    console.log('Debug script path:', scriptPath);
    console.log('Debug script exists:', fs.existsSync(scriptPath));

    // Spawn a Python process to run the debug script
    const pythonProcess = spawn('python', [scriptPath]);
    
    let responseData = '';
    let errorData = '';

    // Listen for data from the Python process
    pythonProcess.stdout.on('data', (data) => {
      console.log('Python debug output:', data.toString());
      responseData += data.toString();
    });

    // Listen for errors
    pythonProcess.stderr.on('data', (data) => {
      console.error('Python debug error:', data.toString());
      errorData += data.toString();
    });

    // Wait for the process to exit
    const exitCode = await new Promise((resolve) => {
      pythonProcess.on('close', (code) => {
        console.log('Python debug process exited with code:', code);
        resolve(code);
      });
    });

    res.status(200).json({
      pythonOutput: responseData,
      pythonError: errorData,
      exitCode
    });
  } catch (error) {
    console.error('Error in debug-python endpoint:', error);
    res.status(500).json({
      error: 'Internal server error',
      details: error.message
    });
  }
});

// Echo endpoint for debugging
app.post('/echo', (req, res) => {
  const accessToken = req.headers['x-hubspot-access-token'] || req.body.accessToken || req.body.hubspotAccessToken;
  res.status(200).json({
    receivedBody: req.body,
    receivedToken: accessToken ? '[PRESENT]' : '[MISSING]'
  });
});

// Main MCP endpoint
app.post('/', handleMcpRequest);

// Start the server
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

```

--------------------------------------------------------------------------------
/src/mcp_server_hubspot/server.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Any, Dict, List, Optional
import os
from dotenv import load_dotenv
from hubspot import HubSpot
from hubspot.crm.contacts import SimplePublicObjectInputForCreate
from hubspot.crm.contacts.exceptions import ApiException
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
import json
from datetime import datetime, timedelta
from dateutil.tz import tzlocal

logger = logging.getLogger('mcp_hubspot_server')

def convert_datetime_fields(obj: Any) -> Any:
    """Convert any datetime or tzlocal objects to string in the given object"""
    if isinstance(obj, dict):
        return {k: convert_datetime_fields(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_datetime_fields(item) for item in obj]
    elif isinstance(obj, datetime):
        return obj.isoformat()
    elif isinstance(obj, tzlocal):
        # Get the current timezone offset
        offset = datetime.now(tzlocal()).strftime('%z')
        return f"UTC{offset[:3]}:{offset[3:]}"  # Format like "UTC+08:00" or "UTC-05:00"
    return obj

class HubSpotClient:
    def __init__(self, access_token: str):
        logger.debug(f"Using access token: {'[MASKED]' if access_token else 'None'}")
        if not access_token:
            raise ValueError("HubSpot access token is required. It must be provided as an argument.")
        
        # Initialize HubSpot client with the provided token
        # This allows for multi-user support by passing user-specific tokens
        self.client = HubSpot(access_token=access_token)

    def get_contacts(self) -> str:
        """Get all contacts from HubSpot (requires optional crm.objects.contacts.read scope)"""
        try:
            contacts = self.client.crm.contacts.get_all()
            contacts_dict = [contact.to_dict() for contact in contacts]
            converted_contacts = convert_datetime_fields(contacts_dict)
            return json.dumps(converted_contacts)
        except ApiException as e:
            logger.error(f"API Exception in get_contacts: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_contacts: {str(e)}")
            return json.dumps({"error": str(e)})

    def get_companies(self) -> str:
        """Get all companies from HubSpot (requires optional crm.objects.companies.read scope)"""
        try:
            companies = self.client.crm.companies.get_all()
            companies_dict = [company.to_dict() for company in companies]
            converted_companies = convert_datetime_fields(companies_dict)
            return json.dumps(converted_companies)
        except ApiException as e:
            logger.error(f"API Exception in get_companies: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_companies: {str(e)}")
            return json.dumps({"error": str(e)})

    def get_company_activity(self, company_id: str) -> str:
        """Get activity history for a specific company (requires optional crm.objects.companies.read scope)"""
        try:
            # Note: This method only requires standard read scopes, not sensitive scopes
            # Step 1: Get all engagement IDs associated with the company using CRM Associations v4 API
            associated_engagements = self.client.crm.associations.v4.basic_api.get_page(
                object_type="companies",
                object_id=company_id,
                to_object_type="engagements",
                limit=500
            )
            
            # Extract engagement IDs from the associations response
            engagement_ids = []
            if hasattr(associated_engagements, 'results'):
                for result in associated_engagements.results:
                    engagement_ids.append(result.to_object_id)

            # Step 2: Get detailed information for each engagement
            activities = []
            for engagement_id in engagement_ids:
                engagement_response = self.client.api_request({
                    "method": "GET",
                    "path": f"/engagements/v1/engagements/{engagement_id}"
                }).json()
                
                engagement_data = engagement_response.get('engagement', {})
                metadata = engagement_response.get('metadata', {})
                
                # Format the engagement
                formatted_engagement = {
                    "id": engagement_data.get("id"),
                    "type": engagement_data.get("type"),
                    "created_at": engagement_data.get("createdAt"),
                    "last_updated": engagement_data.get("lastUpdated"),
                    "created_by": engagement_data.get("createdBy"),
                    "modified_by": engagement_data.get("modifiedBy"),
                    "timestamp": engagement_data.get("timestamp"),
                    "associations": engagement_response.get("associations", {})
                }
                
                # Add type-specific metadata formatting
                if engagement_data.get("type") == "NOTE":
                    formatted_engagement["content"] = metadata.get("body", "")
                elif engagement_data.get("type") == "EMAIL":
                    formatted_engagement["content"] = {
                        "subject": metadata.get("subject", ""),
                        "from": {
                            "raw": metadata.get("from", {}).get("raw", ""),
                            "email": metadata.get("from", {}).get("email", ""),
                            "firstName": metadata.get("from", {}).get("firstName", ""),
                            "lastName": metadata.get("from", {}).get("lastName", "")
                        },
                        "to": [{
                            "raw": recipient.get("raw", ""),
                            "email": recipient.get("email", ""),
                            "firstName": recipient.get("firstName", ""),
                            "lastName": recipient.get("lastName", "")
                        } for recipient in metadata.get("to", [])],
                        "cc": [{
                            "raw": recipient.get("raw", ""),
                            "email": recipient.get("email", ""),
                            "firstName": recipient.get("firstName", ""),
                            "lastName": recipient.get("lastName", "")
                        } for recipient in metadata.get("cc", [])],
                        "bcc": [{
                            "raw": recipient.get("raw", ""),
                            "email": recipient.get("email", ""),
                            "firstName": recipient.get("firstName", ""),
                            "lastName": recipient.get("lastName", "")
                        } for recipient in metadata.get("bcc", [])],
                        "sender": {
                            "email": metadata.get("sender", {}).get("email", "")
                        },
                        "body": metadata.get("text", "") or metadata.get("html", "")
                    }
                elif engagement_data.get("type") == "TASK":
                    formatted_engagement["content"] = {
                        "subject": metadata.get("subject", ""),
                        "body": metadata.get("body", ""),
                        "status": metadata.get("status", ""),
                        "for_object_type": metadata.get("forObjectType", "")
                    }
                elif engagement_data.get("type") == "MEETING":
                    formatted_engagement["content"] = {
                        "title": metadata.get("title", ""),
                        "body": metadata.get("body", ""),
                        "start_time": metadata.get("startTime"),
                        "end_time": metadata.get("endTime"),
                        "internal_notes": metadata.get("internalMeetingNotes", "")
                    }
                elif engagement_data.get("type") == "CALL":
                    formatted_engagement["content"] = {
                        "body": metadata.get("body", ""),
                        "from_number": metadata.get("fromNumber", ""),
                        "to_number": metadata.get("toNumber", ""),
                        "duration_ms": metadata.get("durationMilliseconds"),
                        "status": metadata.get("status", ""),
                        "disposition": metadata.get("disposition", "")
                    }
                
                activities.append(formatted_engagement)

            # Convert any datetime fields and return
            converted_activities = convert_datetime_fields(activities)
            return json.dumps(converted_activities)
            
        except ApiException as e:
            logger.error(f"API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception: {str(e)}")
            return json.dumps({"error": str(e)})

async def main(access_token: str):
    """
    Run the HubSpot MCP server.
    
    Args:
        access_token: HubSpot access token
    
    Note:
        This server requires the following HubSpot scopes:
        Required:
        - oauth
        
        Optional:
        - crm.dealsplits.read_write
        - crm.objects.companies.read
        - crm.objects.companies.write
        - crm.objects.contacts.read
        - crm.objects.contacts.write
        - crm.objects.deals.read
    """
    logger.info("Server starting")
    hubspot = HubSpotClient(access_token)
    server = Server("hubspot-manager")

    @server.list_resources()
    async def handle_list_resources() -> list[types.Resource]:
        return [
            types.Resource(
                uri=AnyUrl("hubspot://hubspot_contacts"),
                name="HubSpot Contacts",
                description="List of HubSpot contacts (requires optional crm.objects.contacts.read scope)",
                mimeType="application/json",
            ),
            types.Resource(
                uri=AnyUrl("hubspot://hubspot_companies"),
                name="HubSpot Companies", 
                description="List of HubSpot companies (requires optional crm.objects.companies.read scope)",
                mimeType="application/json",
            ),
        ]

    @server.read_resource()
    async def handle_read_resource(uri: AnyUrl) -> str:
        if uri.scheme != "hubspot":
            raise ValueError(f"Unsupported URI scheme: {uri.scheme}")

        path = str(uri).replace("hubspot://", "")
        if path == "hubspot_contacts":
            return str(hubspot.get_contacts())
        elif path == "hubspot_companies":
            return str(hubspot.get_companies())
        else:
            raise ValueError(f"Unknown resource path: {path}")

    @server.list_tools()
    async def handle_list_tools() -> list[types.Tool]:
        """List available tools"""
        return [
            types.Tool(
                name="hubspot_get_contacts",
                description="Get contacts from HubSpot (requires optional crm.objects.contacts.read scope)",
                inputSchema={
                    "type": "object",
                    "properties": {},
                },
            ),
            types.Tool(
                name="hubspot_create_contact",
                description="Create a new contact in HubSpot (requires optional crm.objects.contacts.write scope)",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "firstname": {"type": "string", "description": "Contact's first name"},
                        "lastname": {"type": "string", "description": "Contact's last name"},
                        "email": {"type": "string", "description": "Contact's email address"},
                        "properties": {"type": "object", "description": "Additional contact properties"}
                    },
                    "required": ["firstname", "lastname"]
                },
            ),
            types.Tool(
                name="hubspot_get_companies",
                description="Get companies from HubSpot (requires optional crm.objects.companies.read scope)",
                inputSchema={
                    "type": "object",
                    "properties": {},
                },
            ),
            types.Tool(
                name="hubspot_create_company",
                description="Create a new company in HubSpot (requires optional crm.objects.companies.write scope)",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "name": {"type": "string", "description": "Company name"},
                        "properties": {"type": "object", "description": "Additional company properties"}
                    },
                    "required": ["name"]
                },
            ),
            types.Tool(
                name="hubspot_get_company_activity",
                description="Get activity history for a specific company (requires optional crm.objects.companies.read scope)",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "company_id": {"type": "string", "description": "HubSpot company ID"}
                    },
                    "required": ["company_id"]
                },
            ),
        ]

    @server.call_tool()
    async def handle_call_tool(
        name: str, arguments: dict[str, Any] | None
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        """Handle tool execution requests"""
        try:
            if name == "hubspot_get_contacts":
                results = hubspot.get_contacts()
                return [types.TextContent(type="text", text=str(results))]

            elif name == "hubspot_create_contact":
                if not arguments:
                    raise ValueError("Missing arguments for create_contact")
                
                firstname = arguments["firstname"]
                lastname = arguments["lastname"]
                company = arguments.get("properties", {}).get("company")
                
                # Search for existing contacts with same name and company
                try:
                    from hubspot.crm.contacts import PublicObjectSearchRequest
                    
                    filter_groups = [{
                        "filters": [
                            {
                                "propertyName": "firstname",
                                "operator": "EQ",
                                "value": firstname
                            },
                            {
                                "propertyName": "lastname",
                                "operator": "EQ",
                                "value": lastname
                            }
                        ]
                    }]
                    
                    # Add company filter if provided
                    if company:
                        filter_groups[0]["filters"].append({
                            "propertyName": "company",
                            "operator": "EQ",
                            "value": company
                        })
                    
                    search_request = PublicObjectSearchRequest(
                        filter_groups=filter_groups
                    )
                    
                    search_response = hubspot.client.crm.contacts.search_api.do_search(
                        public_object_search_request=search_request
                    )
                    
                    if search_response.total > 0:
                        # Contact already exists
                        return [types.TextContent(
                            type="text", 
                            text=f"Contact already exists: {search_response.results[0].to_dict()}"
                        )]
                    
                    # If no existing contact found, proceed with creation
                    properties = {
                        "firstname": firstname,
                        "lastname": lastname
                    }
                    
                    # Add email if provided
                    if "email" in arguments:
                        properties["email"] = arguments["email"]
                    
                    # Add any additional properties
                    if "properties" in arguments:
                        properties.update(arguments["properties"])
                    
                    # Create contact using SimplePublicObjectInputForCreate
                    simple_public_object_input = SimplePublicObjectInputForCreate(
                        properties=properties
                    )
                    
                    api_response = hubspot.client.crm.contacts.basic_api.create(
                        simple_public_object_input_for_create=simple_public_object_input
                    )
                    return [types.TextContent(type="text", text=str(api_response.to_dict()))]
                    
                except ApiException as e:
                    return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]

            elif name == "hubspot_get_companies":
                results = hubspot.get_companies()
                return [types.TextContent(type="text", text=str(results))]

            elif name == "hubspot_create_company":
                if not arguments:
                    raise ValueError("Missing arguments for create_company")
                
                company_name = arguments["name"]
                
                # Search for existing companies with same name
                try:
                    from hubspot.crm.companies import PublicObjectSearchRequest
                    
                    search_request = PublicObjectSearchRequest(
                        filter_groups=[{
                            "filters": [
                                {
                                    "propertyName": "name",
                                    "operator": "EQ",
                                    "value": company_name
                                }
                            ]
                        }]
                    )
                    
                    search_response = hubspot.client.crm.companies.search_api.do_search(
                        public_object_search_request=search_request
                    )
                    
                    if search_response.total > 0:
                        # Company already exists
                        return [types.TextContent(
                            type="text", 
                            text=f"Company already exists: {search_response.results[0].to_dict()}"
                        )]
                    
                    # If no existing company found, proceed with creation
                    properties = {
                        "name": company_name
                    }
                    
                    # Add any additional properties
                    if "properties" in arguments:
                        properties.update(arguments["properties"])
                    
                    # Create company using SimplePublicObjectInputForCreate
                    simple_public_object_input = SimplePublicObjectInputForCreate(
                        properties=properties
                    )
                    
                    api_response = hubspot.client.crm.companies.basic_api.create(
                        simple_public_object_input_for_create=simple_public_object_input
                    )
                    return [types.TextContent(type="text", text=str(api_response.to_dict()))]
                    
                except ApiException as e:
                    return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]

            elif name == "hubspot_get_company_activity":
                if not arguments:
                    raise ValueError("Missing arguments for get_company_activity")
                results = hubspot.get_company_activity(arguments["company_id"])
                return [types.TextContent(type="text", text=results)]

            else:
                raise ValueError(f"Unknown tool: {name}")

        except ApiException as e:
            return [types.TextContent(type="text", text=f"HubSpot API error: {str(e)}")]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Error: {str(e)}")]

    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        logger.info("Server running with stdio transport")
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="hubspot",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    import asyncio
    import sys
    if len(sys.argv) != 2:
        print("Usage: python server.py <access_token>")
        sys.exit(1)
    asyncio.run(main(sys.argv[1]))
```