This is page 3 of 3. Use http://codebase.md/hanlulong/stata-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── .gitattributes
│ ├── CLI_USAGE.md
│ └── CONTRIBUTING.md
├── .gitignore
├── .vscodeignore
├── CHANGELOG.md
├── docs
│ ├── examples
│ │ ├── auto_report.pdf
│ │ └── jupyter.ipynb
│ ├── incidents
│ │ ├── CLAUDE_CLIENTS_STREAMING_COMPARISON.md
│ │ ├── CLAUDE_CODE_NOTIFICATION_DIAGNOSIS.md
│ │ ├── CLAUDE_CODE_NOTIFICATION_ISSUE.md
│ │ ├── DUAL_TRANSPORT.md
│ │ ├── FINAL_DIAGNOSIS.md
│ │ ├── FINAL_STATUS_REPORT.md
│ │ ├── FINAL_TIMEOUT_TEST_RESULTS.md
│ │ ├── KEEP_ALIVE_IMPLEMENTATION.md
│ │ ├── LONG_EXECUTION_ISSUE.md
│ │ ├── MCP_CLIENT_VERIFICATION_SUCCESS.md
│ │ ├── MCP_ERROR_FIX.md
│ │ ├── MCP_TIMEOUT_SOLUTION.md
│ │ ├── MCP_TRANSPORT_FIX.md
│ │ ├── NOTIFICATION_FIX_COMPLETE.md
│ │ ├── NOTIFICATION_FIX_VERIFIED.md
│ │ ├── NOTIFICATION_ROUTING_BUG.md
│ │ ├── PROGRESSIVE_OUTPUT_APPROACH.md
│ │ ├── README.md
│ │ ├── SESSION_ACCESS_SOLUTION.md
│ │ ├── SSE_STREAMING_IMPLEMENTATION.md
│ │ ├── STREAMING_DIAGNOSIS.md
│ │ ├── STREAMING_IMPLEMENTATION_GUIDE.md
│ │ ├── STREAMING_SOLUTION.md
│ │ ├── STREAMING_STATUS.md
│ │ ├── STREAMING_TEST_GUIDE.md
│ │ ├── TIMEOUT_FIX_SUMMARY.md
│ │ └── TIMEOUT_TEST_REPORT.md
│ ├── jupyter-stata.md
│ ├── jupyter-stata.zh-CN.md
│ ├── release_notes.md
│ ├── release_notes.zh-CN.md
│ ├── releases
│ │ └── INSTALL_v0.3.4.md
│ └── REPO_STRUCTURE.md
├── images
│ ├── demo_2x.gif
│ ├── demo.mp4
│ ├── jupyterlab.png
│ ├── JupyterLabExample.png
│ ├── logo.png
│ ├── pystata.png
│ ├── Stata_MCP_logo_144x144.png
│ └── Stata_MCP_logo_400x400.png
├── LICENSE
├── package.json
├── README.md
├── README.zh-CN.md
├── src
│ ├── check-python.js
│ ├── devtools
│ │ ├── prepare-npm-package.js
│ │ └── restore-vscode-package.js
│ ├── extension.js
│ ├── language-configuration.json
│ ├── requirements.txt
│ ├── start-server.js
│ ├── stata_mcp_server.py
│ └── syntaxes
│ └── stata.tmLanguage.json
└── tests
├── README.md
├── simple_mcp_test.py
├── test_gr_list_issue.do
├── test_graph_issue.do
├── test_graph_name_param.do
├── test_keepalive.do
├── test_log_location.do
├── test_notifications.py
├── test_stata.do
├── test_streaming_http.py
├── test_streaming.do
├── test_timeout_direct.py
├── test_timeout.do
└── test_understanding.do
```
# Files
--------------------------------------------------------------------------------
/src/stata_mcp_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Stata MCP Server - Exposes Stata functionality to AI models via MCP protocol
Using fastapi-mcp for clean implementation
"""
import os
import tempfile
import json
import sys
import time
import argparse
import logging
import platform
import signal
import subprocess
import traceback
import socket
import asyncio
from typing import Dict, Any, Optional
import warnings
import re
# Fix encoding issues on Windows for Unicode characters
if platform.system() == 'Windows':
# Force UTF-8 encoding for stdout and stderr on Windows
import io
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace', line_buffering=True)
if sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace', line_buffering=True)
# Set environment variable for Python to use UTF-8
os.environ['PYTHONIOENCODING'] = 'utf-8'
# Hide Python process from Mac Dock (server should be background process)
if platform.system() == 'Darwin':
try:
from AppKit import NSApplication
# Set activation policy to accessory - hides dock icon but allows functionality
# This must be called early, before any GUI operations (like Stata's JVM graphics)
app = NSApplication.sharedApplication()
# NSApplicationActivationPolicyAccessory = 1 (hidden from dock, can show windows)
# NSApplicationActivationPolicyProhibited = 2 (completely hidden)
app.setActivationPolicy_(1) # Use Accessory to allow Stata's GUI operations
except Exception:
# Silently ignore if AppKit not available or fails
# This is just a UI improvement, not critical for functionality
pass
# Check if running as a module (using -m flag)
is_running_as_module = __name__ == "__main__" and not sys.argv[0].endswith('stata_mcp_server.py')
if is_running_as_module:
print(f"Running as a module, using modified command-line handling")
# Check Python version on Windows but don't exit immediately to allow logging
if platform.system() == 'Windows':
required_version = (3, 11)
current_version = (sys.version_info.major, sys.version_info.minor)
if current_version < required_version:
print(f"WARNING: Python 3.11 or higher is recommended on Windows. Current version: {sys.version}")
print("Please install Python 3.11 from python.org for best compatibility.")
# Log this but don't exit immediately so logs can be written
try:
from fastapi import FastAPI, Request, Response, Query
from fastapi.responses import StreamingResponse
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel, Field
from contextlib import asynccontextmanager
import httpx
except ImportError as e:
print(f"ERROR: Required Python packages not found: {str(e)}")
print("Please install the required packages:")
print("pip install fastapi uvicorn fastapi-mcp pydantic")
# On Windows, provide more guidance
if platform.system() == 'Windows':
print("\nOn Windows, you can install required packages by running:")
print("py -3.11 -m pip install fastapi uvicorn fastapi-mcp pydantic")
print("\nIf you need to install Python 3.11, download it from: https://www.python.org/downloads/")
# Exit with error
sys.exit(1)
# Configure logging - will be updated in main() with proper log file
# Start with basic console logging
logging.basicConfig(
level=logging.INFO, # Changed from DEBUG to INFO to reduce verbosity
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout # Default to stdout until log file is configured
)
# Create console handler for debugging
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.WARNING) # Only show WARNING level and above to keep console output clean
formatter = logging.Formatter('%(levelname)s: %(message)s')
console_handler.setFormatter(formatter)
logging.getLogger().addHandler(console_handler)
# Silence uvicorn access logs but allow warnings
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("uvicorn.error").setLevel(logging.WARNING)
# Server info
SERVER_NAME = "Stata MCP Server"
SERVER_VERSION = "1.0.0"
# Flag for Stata availability
stata_available = False
has_stata = False
stata = None # Module-level reference to stata module
STATA_PATH = None
# Add a flag to track if we've already displayed the Stata banner
stata_banner_displayed = False
# Add a flag to track if MCP server is fully initialized
mcp_initialized = False
# Add a storage for continuous command history
command_history = []
# Store the current Stata edition
stata_edition = 'mp' # Default to MP edition
# Store log file settings
log_file_location = 'extension' # Default to extension directory
custom_log_directory = '' # Custom log directory
extension_path = None # Path to the extension directory
# Try to import pandas
try:
import pandas as pd
has_pandas = True
logging.info("pandas module loaded successfully")
except ImportError:
has_pandas = False
logging.warning("pandas not available, data transfer functionality will be limited")
warnings.warn("pandas not available, data transfer functionality will be limited")
# Try to initialize Stata with the given path
def try_init_stata(stata_path):
"""Try to initialize Stata with the given path"""
global stata_available, has_stata, stata, STATA_PATH, stata_banner_displayed, stata_edition
# If Stata is already available, don't re-initialize
if stata_available and has_stata and stata is not None:
logging.debug("Stata already initialized, skipping re-initialization")
return True
# Clean the path (remove quotes if present)
if stata_path:
# Remove any quotes that might have been added
stata_path = stata_path.strip('"\'')
STATA_PATH = stata_path
logging.info(f"Using Stata path: {stata_path}")
logging.info(f"Initializing Stata from path: {stata_path}")
try:
# Add environment variables to help with library loading
if stata_path:
if not os.path.exists(stata_path):
error_msg = f"Stata path does not exist: {stata_path}"
logging.error(error_msg)
print(f"ERROR: {error_msg}")
return False
os.environ['SYSDIR_STATA'] = stata_path
stata_utilities_path = os.path.join(os.environ.get('SYSDIR_STATA', ''), 'utilities')
if os.path.exists(stata_utilities_path):
sys.path.insert(0, stata_utilities_path)
logging.debug(f"Added Stata utilities path to sys.path: {stata_utilities_path}")
else:
warning_msg = f"Stata utilities path not found: {stata_utilities_path}"
logging.warning(warning_msg)
# Try to import pystata or stata-sfi
try:
# First try pystata
from pystata import config
logging.debug("Successfully imported pystata")
# Try to initialize Stata
try:
# Only show banner once (suppress if we've shown it before)
if not stata_banner_displayed and platform.system() == 'Windows':
# On Windows, the banner appears even if we try to suppress it
# At least mark that we've displayed it
stata_banner_displayed = True
logging.debug("Stata banner will be displayed (first time)")
else:
# On subsequent initializations, try to suppress the banner
# This doesn't always work on Windows, but at least we're trying
logging.debug("Attempting to suppress Stata banner on re-initialization")
os.environ['STATA_QUIETLY'] = '1' # Add this environment variable
# Set Java headless mode to prevent Dock icon on Mac (must be before config.init)
# When Stata's embedded JVM initializes for graphics, it normally creates a Dock icon
# Setting headless=true prevents this GUI behavior
if platform.system() == 'Darwin':
os.environ['JAVA_TOOL_OPTIONS'] = '-Djava.awt.headless=true'
logging.debug("Set Java headless mode to prevent Dock icon")
# Initialize with the specified Stata edition
config.init(stata_edition)
logging.info(f"Stata initialized successfully with {stata_edition.upper()} edition")
# Fix encoding for PyStata output on Windows
if platform.system() == 'Windows':
import io
# Replace PyStata's output file handle with UTF-8 encoded version
config.stoutputf = io.TextIOWrapper(
sys.stdout.buffer,
encoding='utf-8',
errors='replace',
line_buffering=True
)
logging.debug("Configured PyStata output with UTF-8 encoding for Windows")
# Now import stata after initialization
from pystata import stata as stata_module
# Set module-level stata reference
globals()['stata'] = stata_module
# Successfully initialized Stata
has_stata = True
stata_available = True
# Initialize PNG export capability to prevent JVM crash in daemon threads (Mac-specific)
#
# Root cause: On Mac, Stata's graphics use embedded JVM. When PNG export is first
# called from a daemon thread, the JVM initialization fails with SIGBUS error in
# CodeHeap::allocate(). This is Mac-specific due to different JVM/threading model
# in libstata-mp.dylib compared to Windows stata-mp-64.dll.
#
# Solution: Initialize JVM in main thread by doing one PNG export at startup.
# All subsequent daemon thread PNG exports will reuse the initialized JVM.
#
# See: tests/MAC_SPECIFIC_ANALYSIS.md for detailed technical analysis
try:
from pystata.config import stlib, get_encode_str
import tempfile
# Create minimal dataset and graph (2 obs, 1 var)
stlib.StataSO_Execute(get_encode_str("qui clear"), False)
stlib.StataSO_Execute(get_encode_str("qui set obs 2"), False)
stlib.StataSO_Execute(get_encode_str("qui gen x=1"), False)
stlib.StataSO_Execute(get_encode_str("qui twoway scatter x x, name(_init, replace)"), False)
# Export tiny PNG (10x10px) to initialize JVM in main thread
# This prevents SIGBUS crash when daemon threads later export PNG
png_init = os.path.join(tempfile.gettempdir(), "_stata_png_init.png")
stlib.StataSO_Execute(get_encode_str(f'qui graph export "{png_init}", name(_init) replace width(10) height(10)'), False)
stlib.StataSO_Execute(get_encode_str("qui graph drop _init"), False)
# Cleanup temporary files
if os.path.exists(png_init):
os.unlink(png_init)
logging.debug("PNG export initialized successfully (Mac JVM fix)")
except Exception as png_init_error:
# Non-fatal: log but continue - PNG may still work on some platforms
logging.warning(f"PNG initialization failed (non-fatal): {str(png_init_error)}")
return True
except Exception as init_error:
error_msg = f"Failed to initialize Stata: {str(init_error)}"
logging.error(error_msg)
print(f"ERROR: {error_msg}")
print("Will attempt to continue without full Stata integration")
print("Check if Stata is already running in another instance, or if your Stata license is valid")
# Some features will still work without full initialization
has_stata = False
stata_available = False
return False
except ImportError as config_error:
# Try stata-sfi as fallback
try:
import stata_setup
# Only show banner once
if not stata_banner_displayed and platform.system() == 'Windows':
stata_banner_displayed = True
logging.debug("Stata banner will be displayed (first time)")
else:
# On subsequent initializations, try to suppress the banner
logging.debug("Attempting to suppress Stata banner on re-initialization")
os.environ['STATA_QUIETLY'] = '1'
stata_setup.config(stata_path, stata_edition)
logging.debug("Successfully configured stata_setup")
try:
import sfi
# Set module-level stata reference for compatibility
globals()['stata'] = sfi
has_stata = True
stata_available = True
logging.info("Stata initialized successfully using sfi")
return True
except ImportError as sfi_error:
error_msg = f"Could not import sfi: {str(sfi_error)}"
logging.error(error_msg)
print(f"ERROR: {error_msg}")
has_stata = False
stata_available = False
return False
except Exception as setup_error:
error_msg = f"Could not import pystata or sfi: {str(setup_error)}"
logging.error(error_msg)
print(f"ERROR: {error_msg}")
print("Stata commands will not be available")
has_stata = False
stata_available = False
return False
except Exception as e:
error_msg = f"General error setting up Stata environment: {str(e)}"
logging.error(error_msg)
print(f"ERROR: {error_msg}")
print("Stata commands will not be available")
print(f"Check if the Stata path is correct: {stata_path}")
print("And ensure Stata is properly licensed and not running in another process")
has_stata = False
stata_available = False
return False
# Lock file mechanism removed - VS Code/Cursor handles extension instances properly
# If there are port conflicts, the server will fail to start cleanly
def get_log_file_path(do_file_path, do_file_base):
"""Get the appropriate log file path based on user settings
Returns an absolute path to ensure log files are saved to the correct location
regardless of Stata's working directory.
"""
global log_file_location, custom_log_directory, extension_path
if log_file_location == 'extension':
# Use logs folder in extension directory
if extension_path:
logs_dir = os.path.join(extension_path, 'logs')
# Create logs directory if it doesn't exist
os.makedirs(logs_dir, exist_ok=True)
log_path = os.path.join(logs_dir, f"{do_file_base}_mcp.log")
return os.path.abspath(log_path)
else:
# Fallback to workspace if extension path is not available
do_file_dir = os.path.dirname(do_file_path)
log_path = os.path.join(do_file_dir, f"{do_file_base}_mcp.log")
return os.path.abspath(log_path)
elif log_file_location == 'custom':
# Use custom directory
if custom_log_directory and os.path.exists(custom_log_directory):
log_path = os.path.join(custom_log_directory, f"{do_file_base}_mcp.log")
return os.path.abspath(log_path)
else:
# Fallback to workspace if custom directory is invalid
logging.warning(f"Custom log directory not valid: {custom_log_directory}, falling back to workspace")
do_file_dir = os.path.dirname(do_file_path)
log_path = os.path.join(do_file_dir, f"{do_file_base}_mcp.log")
return os.path.abspath(log_path)
else: # workspace
# Use same directory as .do file (original behavior)
do_file_dir = os.path.dirname(do_file_path)
log_path = os.path.join(do_file_dir, f"{do_file_base}_mcp.log")
return os.path.abspath(log_path)
def resolve_do_file_path(file_path: str) -> tuple[Optional[str], list[str]]:
"""Resolve a .do file path to an absolute location, mirroring run_stata_file logic.
Returns:
A tuple of (resolved_path, tried_paths). resolved_path is None if the file
could not be located. tried_paths contains the normalized paths that were examined.
"""
original_path = file_path
normalized_path = os.path.normpath(file_path)
# Normalize Windows paths to use backslashes for consistency
if platform.system() == "Windows" and '/' in normalized_path:
normalized_path = normalized_path.replace('/', '\\')
logging.info(f"Converted path for Windows: {normalized_path}")
candidates: list[str] = []
tried_paths: list[str] = []
if not os.path.isabs(normalized_path):
cwd = os.getcwd()
logging.info(f"File path is not absolute. Current working directory: {cwd}")
candidates.extend([
normalized_path,
os.path.join(cwd, normalized_path),
os.path.join(cwd, os.path.basename(normalized_path)),
])
if platform.system() == "Windows":
if '/' in original_path:
win_path = original_path.replace('/', '\\')
candidates.append(win_path)
candidates.append(os.path.join(cwd, win_path))
elif '\\' in original_path:
unix_path = original_path.replace('\\', '/')
candidates.append(unix_path)
candidates.append(os.path.join(cwd, unix_path))
# Search subdirectories up to two levels deep for the file
for root, dirs, files in os.walk(cwd, topdown=True, followlinks=False):
if os.path.basename(normalized_path) in files and root != cwd:
subdir_path = os.path.join(root, os.path.basename(normalized_path))
candidates.append(subdir_path)
# Limit depth to two levels
if root.replace(cwd, '').count(os.sep) >= 2:
dirs[:] = []
else:
candidates.append(normalized_path)
# Deduplicate while preserving order
seen = set()
unique_candidates = []
for candidate in candidates:
normalized_candidate = os.path.normpath(candidate)
if normalized_candidate not in seen:
seen.add(normalized_candidate)
unique_candidates.append(normalized_candidate)
for candidate in unique_candidates:
tried_paths.append(candidate)
if os.path.isfile(candidate) and candidate.lower().endswith('.do'):
resolved = os.path.abspath(candidate)
logging.info(f"Found file at: {resolved}")
return resolved, tried_paths
return None, tried_paths
def get_stata_path():
"""Get the Stata executable path based on the platform and configured path"""
global STATA_PATH
if not STATA_PATH:
return None
# Build the actual executable path based on the platform
if platform.system() == "Windows":
# On Windows, executable is StataMP.exe or similar
# Try different executable names
for exe_name in ["StataMP-64.exe", "StataMP.exe", "StataSE-64.exe", "StataSE.exe", "Stata-64.exe", "Stata.exe"]:
exe_path = os.path.join(STATA_PATH, exe_name)
if os.path.exists(exe_path):
return exe_path
# If no specific executable found, use the default path with StataMP.exe
return os.path.join(STATA_PATH, "StataMP.exe")
else:
# On macOS, executable is StataMPC inside the app bundle
if platform.system() == "Darwin": # macOS
# Check if STATA_PATH is the app bundle path
if STATA_PATH.endswith(".app"):
# App bundle format like /Applications/Stata/StataMC.app
exe_path = os.path.join(STATA_PATH, "Contents", "MacOS", "StataMP")
if os.path.exists(exe_path):
return exe_path
# Try other Stata variants
for variant in ["StataSE", "Stata"]:
exe_path = os.path.join(STATA_PATH, "Contents", "MacOS", variant)
if os.path.exists(exe_path):
return exe_path
else:
# Direct path like /Applications/Stata
for variant in ["StataMP", "StataSE", "Stata"]:
# Check if there's an app bundle inside the directory
app_path = os.path.join(STATA_PATH, f"{variant}.app")
if os.path.exists(app_path):
exe_path = os.path.join(app_path, "Contents", "MacOS", variant)
if os.path.exists(exe_path):
return exe_path
# Also check for direct executable
exe_path = os.path.join(STATA_PATH, variant)
if os.path.exists(exe_path):
return exe_path
else:
# Linux - executable should be inside the path directly
for variant in ["stata-mp", "stata-se", "stata"]:
exe_path = os.path.join(STATA_PATH, variant)
if os.path.exists(exe_path):
return exe_path
# If we get here, we couldn't find the executable
logging.error(f"Could not find Stata executable in {STATA_PATH}")
return STATA_PATH # Return the base path as fallback
def check_stata_installed():
"""Check if Stata is installed and available"""
global stata_available
# First check if we have working Python integration
if stata_available and 'stata' in globals():
return True
# Otherwise check for executable
stata_path = get_stata_path()
if not stata_path:
return False
# Check if the file exists and is executable
if not os.path.exists(stata_path):
return False
# On non-Windows, check if it's executable
if platform.system() != "Windows" and not os.access(stata_path, os.X_OK):
return False
return True
# Function to run a Stata command
def run_stata_command(command: str, clear_history=False, auto_detect_graphs=False):
"""Run a Stata command
Args:
command: The Stata command to run
clear_history: Whether to clear command history
auto_detect_graphs: Whether to detect and export graphs after execution (default: False for MCP/LLM calls)
Note: This function manually enables _gr_list on before execution and detects graphs after.
We do NOT use inline=True because it calls _gr_list off at the end, clearing our graph list!
This function is only called from /v1/tools endpoint which is excluded from MCP.
"""
global stata_available, has_stata, command_history
# Only log at debug level instead of info to reduce verbosity
logging.debug(f"Running Stata command: {command}")
# Clear history if requested
if clear_history:
logging.info(f"Clearing command history (had {len(command_history)} items)")
command_history = []
# If it's just a clear request with no command, return empty
if not command or command.strip() == '':
logging.info("Clear history request completed")
return ''
# For multi-line commands, don't add semicolons - just clean up whitespace
if "\n" in command:
# Clean up the commands to ensure proper formatting without adding semicolons
command = "\n".join(line.strip() for line in command.splitlines() if line.strip())
logging.debug(f"Processed multiline command: {command}")
# Special handling for 'do' commands with file paths
if command.lower().startswith('do '):
# Extract the file path part
parts = command.split(' ', 1)
if len(parts) > 1:
file_path = parts[1].strip()
# Remove any existing quotes
if (file_path.startswith('"') and file_path.endswith('"')) or \
(file_path.startswith("'") and file_path.endswith("'")):
file_path = file_path[1:-1]
# Normalize path for OS
file_path = os.path.normpath(file_path)
# On Windows, make sure backslashes are used
if platform.system() == "Windows" and '/' in file_path:
file_path = file_path.replace('/', '\\')
logging.debug(f"Converted path for Windows: {file_path}")
# For Stata's do command, ALWAYS use double quotes regardless of platform
# This is the most reliable approach to handle spaces and special characters
file_path = f'"{file_path}"'
# Reconstruct the command with the properly formatted path
command = f"do {file_path}"
logging.debug(f"Reformatted 'do' command: {command}")
# Check if pystata is available
if has_stata and stata_available:
# Run the command via pystata
try:
# Enable graph listing for this command using low-level API
try:
from pystata.config import stlib, get_encode_str
logging.debug("Enabling graph listing with _gr_list on...")
stlib.StataSO_Execute(get_encode_str("qui _gr_list on"), False)
logging.debug("Successfully enabled graph listing")
except Exception as e:
logging.warning(f"Could not enable graph listing: {str(e)}")
logging.debug(f"Graph listing enable error: {traceback.format_exc()}")
# Initialize graphs list (will be populated if graphs are found)
graphs_from_interactive = []
# Create a temp file to capture output
with tempfile.NamedTemporaryFile(
suffix='.do', delete=False, mode='w', encoding='utf-8'
) as f:
# Write the command to the file
f.write(f"capture log close _all\n")
f.write(f"log using \"{f.name}.log\", replace text\n")
# Process command line by line to comment out cls commands
cls_commands_found = 0
processed_command = ""
for line in command.splitlines():
# Ensure line is a string (defensive programming)
line = str(line) if line is not None else ""
# Check if this is a cls command
if re.match(r'^\s*cls\s*$', line, re.IGNORECASE):
processed_command += f"* COMMENTED OUT BY MCP: {line}\n"
cls_commands_found += 1
else:
processed_command += f"{line}\n"
if cls_commands_found > 0:
logging.info(f"Found and commented out {cls_commands_found} cls commands in the selection")
# Special handling for 'do' commands to ensure proper quoting
if command.lower().startswith('do '):
# For do commands, we need to make sure the file path is properly handled
# The command already has the file in quotes from the code above
f.write(f"{processed_command}")
else:
# Normal commands don't need special treatment
f.write(f"{processed_command}")
f.write(f"capture log close\n")
do_file = f.name
# Execute the do file with echo=False to completely silence Stata output to console
try:
# Redirect stdout temporarily to silence Stata output
original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
try:
# Always use double quotes for the do file path for PyStata
run_cmd = f"do \"{do_file}\""
# Use inline=False because inline=True calls _gr_list off at the end!
globals()['stata'].run(run_cmd, echo=False, inline=False)
logging.debug(f"Command executed successfully via pystata: {run_cmd}")
except Exception as e:
# If command fails, try to reinitialize Stata once
logging.warning(f"Stata command failed, attempting to reinitialize: {str(e)}")
# Try to reinitialize Stata with the global path
if STATA_PATH:
if try_init_stata(STATA_PATH):
# Retry the command if reinitialization succeeded
try:
globals()['stata'].run(f"do \"{do_file}\"", echo=False, inline=False)
logging.info(f"Command succeeded after Stata reinitialization")
except Exception as retry_error:
logging.error(f"Command still failed after reinitializing Stata: {str(retry_error)}")
raise retry_error
else:
logging.error(f"Failed to reinitialize Stata")
raise e
else:
logging.error(f"No Stata path available for reinitialization")
raise e
finally:
# Restore stdout
sys.stdout.close()
sys.stdout = original_stdout
# Only detect and export graphs if enabled (not from LLM/MCP)
if auto_detect_graphs:
# Immediately check for graphs while they're still in memory
# This happens right after stata.run() completes, before any cleanup
try:
logging.debug("Checking for graphs immediately after execution (interactive mode)...")
graphs_from_interactive = display_graphs_interactive(graph_format='png', width=800, height=600)
if graphs_from_interactive:
logging.info(f"Captured {len(graphs_from_interactive)} graphs in interactive mode")
except Exception as graph_err:
logging.warning(f"Could not capture graphs in interactive mode: {str(graph_err)}")
except Exception as exec_error:
error_msg = f"Error running command: {str(exec_error)}"
logging.error(error_msg)
return error_msg
# Read the log file
log_file = f"{do_file}.log"
logging.debug(f"Reading log file: {log_file}")
# Wait for the log file to be written
max_attempts = 10
attempts = 0
while not os.path.exists(log_file) and attempts < max_attempts:
time.sleep(0.3)
attempts += 1
if not os.path.exists(log_file):
logging.error(f"Log file not created: {log_file}")
return "Command executed but no output was captured"
# Wait a moment for file writing to complete
time.sleep(0.5)
try:
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
log_content = f.read()
# MUCH SIMPLER APPROACH: Just filter beginning and end of log file
lines = log_content.strip().split('\n')
# Find the first actual command (first line that starts with a dot that's not log related)
start_index = 0
for i, line in enumerate(lines):
if line.strip().startswith('.') and 'log ' not in line and 'capture log close' not in line:
# Found the first actual command, so output starts right after this
start_index = i + 1
break
# Find end of output (the "capture log close" or "end of do-file" at the end)
end_index = len(lines)
for i in range(len(lines)-1, 0, -1):
if 'capture log close' in lines[i] or 'end of do-file' in lines[i]:
end_index = i
break
# Extract just the middle part (the actual output)
result_lines = []
for i in range(start_index, end_index):
line = lines[i].rstrip() # Remove trailing whitespace
# Skip empty lines at beginning or end
if not line.strip():
continue
# Keep command lines (don't filter out lines starting with '.')
# Remove consecutive blank lines (keep just one)
if (not line.strip() and result_lines and not result_lines[-1].strip()):
continue
result_lines.append(line)
# Clean up temporary files
try:
os.unlink(do_file)
os.unlink(log_file)
except Exception as e:
logging.warning(f"Could not delete temporary files: {str(e)}")
# Add timestamp to the result
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
command_entry = f"[{timestamp}] {command}"
# Return properly formatted output
if not result_lines:
result = "Command executed successfully (no output)"
else:
result = "\n".join(result_lines)
# Use graphs captured in interactive mode (if any)
# These were already captured right after execution while still in memory
if graphs_from_interactive:
graph_info = "\n\n" + "="*60 + "\n"
graph_info += f"GRAPHS DETECTED: {len(graphs_from_interactive)} graph(s) created\n"
graph_info += "="*60 + "\n"
for graph in graphs_from_interactive:
# Include command if available, using special format for JavaScript parsing
if 'command' in graph and graph['command']:
graph_info += f" • {graph['name']}: {graph['path']} [CMD: {graph['command']}]\n"
else:
graph_info += f" • {graph['name']}: {graph['path']}\n"
result += graph_info
logging.info(f"Added {len(graphs_from_interactive)} graphs to output (from interactive mode)")
else:
logging.debug("No graphs were captured in interactive mode")
# Disable graph listing after detection
try:
from pystata.config import stlib, get_encode_str
stlib.StataSO_Execute(get_encode_str("qui _gr_list off"), False)
logging.debug("Disabled graph listing")
except Exception as e:
logging.warning(f"Could not disable graph listing: {str(e)}")
# For interactive window, just return the current result
# The client will handle displaying history
return result
except Exception as e:
error_msg = f"Error reading log file: {str(e)}"
logging.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error executing Stata command: {str(e)}"
logging.error(error_msg)
# Add to command history
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
command_entry = f"[{timestamp}] {command}"
command_history.append({"command": command_entry, "result": error_msg})
return error_msg
else:
error_msg = "Stata is not available. Please check if Stata is installed and configured correctly."
logging.error(error_msg)
# Add to command history
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
command_entry = f"[{timestamp}] {command}"
command_history.append({"command": command_entry, "result": error_msg})
return error_msg
def detect_and_export_graphs():
"""Detect and export any graphs created by Stata commands
Returns:
List of dictionaries with graph info: [{"name": "graph1", "path": "/path/to/graph.png"}, ...]
"""
global stata_available, has_stata, extension_path
if not (has_stata and stata_available):
return []
try:
import sfi
from pystata.config import stlib, get_encode_str
# Get list of graphs using low-level API like PyStata does
logging.debug("Checking for graphs using _gr_list (low-level API)...")
# Get the list (_gr_list should already be on from before command execution)
rc = stlib.StataSO_Execute(get_encode_str("qui _gr_list list"), False)
logging.debug(f"_gr_list list returned rc={rc}")
gnamelist = sfi.Macro.getGlobal("r(_grlist)")
logging.debug(f"r(_grlist) returned: '{gnamelist}' (type: {type(gnamelist)}, length: {len(gnamelist) if gnamelist else 0})")
if not gnamelist:
logging.debug("No graphs found (gnamelist is empty)")
return []
graphs_info = []
graph_names = gnamelist.split()
logging.info(f"Found {len(graph_names)} graph(s): {graph_names}")
# Create graphs directory in extension path or temp
if extension_path:
graphs_dir = os.path.join(extension_path, 'graphs')
else:
graphs_dir = os.path.join(tempfile.gettempdir(), 'stata_mcp_graphs')
os.makedirs(graphs_dir, exist_ok=True)
logging.debug(f"Exporting graphs to: {graphs_dir}")
# Export each graph to PNG
for i, gname in enumerate(graph_names):
try:
# Display the graph first using low-level API
# Stata graph names should not be quoted in graph display command
gph_disp = f'qui graph display {gname}'
rc = stlib.StataSO_Execute(get_encode_str(gph_disp), False)
if rc != 0:
logging.warning(f"Failed to display graph '{gname}' (rc={rc})")
continue
# Export as PNG (best for VS Code display)
# Use a sanitized filename but keep the original name for the name() option
graph_file = os.path.join(graphs_dir, f'{gname}.png')
# The name() option does NOT need quotes - it's a Stata name, not a string
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace width(800) height(600)'
logging.debug(f"Executing graph export command: {gph_exp}")
rc = stlib.StataSO_Execute(get_encode_str(gph_exp), False)
if rc != 0:
logging.warning(f"Failed to export graph '{gname}' (rc={rc})")
continue
if os.path.exists(graph_file):
graphs_info.append({
"name": gname,
"path": graph_file
})
logging.info(f"Exported graph '{gname}' to {graph_file}")
else:
logging.warning(f"Failed to export graph '{gname}' - file not created")
except Exception as e:
logging.error(f"Error exporting graph '{gname}': {str(e)}")
continue
return graphs_info
except Exception as e:
logging.error(f"Error detecting graphs: {str(e)}")
return []
def display_graphs_interactive(graph_format='png', width=800, height=600):
"""Display graphs using PyStata's interactive approach (similar to Jupyter)
This function mimics PyStata's grdisplay.py approach for exporting graphs.
It should be called immediately after command execution while graphs are still in memory.
Args:
graph_format: Format for exported graphs ('svg', 'png', or 'pdf')
width: Width for graph export (pixels for png, inches for svg/pdf)
height: Height for graph export (pixels for png, inches for svg/pdf)
Returns:
List of dictionaries with graph info: [{"name": "graph1", "path": "/path/to/graph.png", "format": "png", "command": "scatter y x"}, ...]
"""
global stata_available, has_stata, extension_path
if not (has_stata and stata_available):
return []
try:
import sfi
from pystata.config import stlib, get_encode_str
# Use the same approach as PyStata's grdisplay.py
logging.debug(f"Interactive graph display: checking for graphs (format: {graph_format})...")
# Get the list of graphs (_gr_list should already be on from before file execution)
rc = stlib.StataSO_Execute(get_encode_str("qui _gr_list list"), False)
logging.debug(f"_gr_list list returned rc={rc}")
gnamelist = sfi.Macro.getGlobal("r(_grlist)")
logging.debug(f"r(_grlist) returned: '{gnamelist}' (type: {type(gnamelist)}, length: {len(gnamelist) if gnamelist else 0})")
if not gnamelist:
logging.debug("No graphs found in interactive mode")
return []
graphs_info = []
graph_names = gnamelist.split()
logging.info(f"Found {len(graph_names)} graph(s) in interactive mode: {graph_names}")
# Create graphs directory
if extension_path:
graphs_dir = os.path.join(extension_path, 'graphs')
else:
graphs_dir = os.path.join(tempfile.gettempdir(), 'stata_mcp_graphs')
os.makedirs(graphs_dir, exist_ok=True)
logging.debug(f"Exporting graphs to: {graphs_dir}")
# Export each graph using PyStata's approach
for i, gname in enumerate(graph_names):
try:
# Display the graph first (required before export)
# Stata graph names should not be quoted in graph display command
gph_disp = f'qui graph display {gname}'
logging.debug(f"Displaying graph: {gph_disp}")
rc = stlib.StataSO_Execute(get_encode_str(gph_disp), False)
if rc != 0:
logging.warning(f"Failed to display graph '{gname}' (rc={rc})")
continue
# Determine file extension and export command based on format
if graph_format == 'svg':
graph_file = os.path.join(graphs_dir, f'{gname}.svg')
if width and height:
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace width({width}) height({height})'
else:
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace'
elif graph_format == 'pdf':
graph_file = os.path.join(graphs_dir, f'{gname}.pdf')
# For PDF, use xsize/ysize instead of width/height
if width and height:
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace xsize({width/96:.2f}) ysize({height/96:.2f})'
else:
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace'
else: # png (default)
graph_file = os.path.join(graphs_dir, f'{gname}.png')
if width and height:
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace width({width}) height({height})'
else:
gph_exp = f'qui graph export "{graph_file}", name({gname}) replace width(800) height(600)'
# Export the graph
logging.debug(f"Exporting graph: {gph_exp}")
rc = stlib.StataSO_Execute(get_encode_str(gph_exp), False)
if rc != 0:
logging.warning(f"Failed to export graph '{gname}' (rc={rc})")
continue
if os.path.exists(graph_file):
graph_dict = {
"name": gname,
"path": graph_file,
"format": graph_format
}
graphs_info.append(graph_dict)
logging.info(f"Exported graph '{gname}' to {graph_file} (format: {graph_format})")
else:
logging.warning(f"Graph file not found after export: {graph_file}")
except Exception as e:
logging.error(f"Error exporting graph '{gname}': {str(e)}")
continue
return graphs_info
except Exception as e:
logging.error(f"Error in interactive graph display: {str(e)}")
logging.debug(f"Interactive display error details: {traceback.format_exc()}")
return []
def run_stata_selection(selection, working_dir=None, auto_detect_graphs=False):
"""Run selected Stata code
Args:
selection: The Stata code to run
working_dir: Optional working directory to change to before execution
auto_detect_graphs: Whether to detect and export graphs (default: False for MCP/LLM calls)
"""
# If a working directory is provided, prepend a cd command
if working_dir and os.path.isdir(working_dir):
logging.info(f"Changing working directory to: {working_dir}")
# Normalize path for the OS
working_dir = os.path.normpath(working_dir)
# On Windows, ensure backslashes
if platform.system() == "Windows":
working_dir = working_dir.replace('/', '\\')
# Use double quotes for the cd command to handle spaces
cd_command = f'cd "{working_dir}"'
# Combine cd command with the selection
full_command = f"{cd_command}\n{selection}"
return run_stata_command(full_command, auto_detect_graphs=auto_detect_graphs)
else:
return run_stata_command(selection, auto_detect_graphs=auto_detect_graphs)
def run_stata_file(file_path: str, timeout=600, auto_name_graphs=False):
"""Run a Stata .do file with improved handling for long-running processes
Args:
file_path: The path to the .do file to run
timeout: Timeout in seconds (default: 600 seconds / 10 minutes)
auto_name_graphs: Whether to automatically add names to graphs (default: False for MCP/LLM calls)
"""
# Set timeout from parameter instead of hardcoding
MAX_TIMEOUT = timeout
try:
original_path = file_path
resolved_path, tried_paths = resolve_do_file_path(file_path)
if not resolved_path:
tried_display = ', '.join(tried_paths) if tried_paths else os.path.normpath(file_path)
error_msg = f"Error: File not found: {original_path}. Tried these paths: {tried_display}"
logging.error(error_msg)
# Add more helpful error message for Windows
if platform.system() == "Windows":
error_msg += "\n\nCommon Windows path issues:\n"
error_msg += "1. Make sure the file path uses correct separators (use \\ instead of /)\n"
error_msg += "2. Check if the file exists in the specified location\n"
error_msg += "3. If using relative paths, the current working directory is: " + os.getcwd()
return error_msg
file_path = resolved_path
# Verify file exists (final check)
if not os.path.exists(file_path):
error_msg = f"Error: File not found: {file_path}"
logging.error(error_msg)
# Add more helpful error message for Windows
if platform.system() == "Windows":
error_msg += "\n\nCommon Windows path issues:\n"
error_msg += "1. Make sure the file path uses correct separators (use \\ instead of /)\n"
error_msg += "2. Check if the file exists in the specified location\n"
error_msg += "3. If using relative paths, the current working directory is: " + os.getcwd()
return error_msg
# Check file extension
if not file_path.lower().endswith('.do'):
error_msg = f"Error: File must be a Stata .do file with .do extension: {file_path}"
logging.error(error_msg)
return error_msg
logging.info(f"Running Stata do file: {file_path}")
# Ensure file_path is absolute for consistent behavior
file_path = os.path.abspath(file_path)
# Get the directory and filename for later use
do_file_dir = os.path.dirname(file_path) # This is now guaranteed to be absolute
do_file_name = os.path.basename(file_path)
do_file_base = os.path.splitext(do_file_name)[0]
# Create a custom log file path based on user settings
# The log file path will be absolute, allowing it to be saved anywhere
# regardless of Stata's current working directory
custom_log_file = get_log_file_path(file_path, do_file_base)
logging.info(f"Will save log to: {custom_log_file}")
# Read the do file content
do_file_content = ""
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
do_file_content = f.read()
# Create a modified version with log commands commented out and auto-name graphs
modified_content = ""
log_commands_found = 0
graph_counter = 0
# Process line by line to comment out log commands and add graph names where needed
cls_commands_found = 0
for line in do_file_content.splitlines():
# Ensure line is a string (defensive programming)
line = str(line) if line is not None else ""
# Check if this line has a log command
if re.match(r'^\s*(log\s+using|log\s+close|capture\s+log\s+close)', line, re.IGNORECASE):
modified_content += f"* COMMENTED OUT BY MCP: {line}\n"
log_commands_found += 1
continue
# Check if this is a cls command
if re.match(r'^\s*cls\s*$', line, re.IGNORECASE):
modified_content += f"* COMMENTED OUT BY MCP: {line}\n"
cls_commands_found += 1
continue
# Only auto-name graphs if called from VS Code extension (not from LLM/MCP)
if auto_name_graphs:
# Check if this is a graph creation command that might need a name
# Match: scatter, histogram, twoway, kdensity, graph bar/box/dot/etc (but not graph export)
graph_match = re.match(r'^(\s*)(scatter|histogram|twoway|kdensity|graph\s+(bar|box|dot|pie|matrix|hbar|hbox|combine))\s+(.*)$', line, re.IGNORECASE)
if graph_match:
indent = str(graph_match.group(1) or "")
graph_cmd = str(graph_match.group(2) or "")
# Extract and ensure rest is a string
rest_raw = graph_match.group(4) if graph_match.lastindex >= 4 else ""
if rest_raw is None:
rest_raw = ""
# Force conversion to string to handle any edge cases
rest = str(rest_raw)
# Double-check rest is a string before any operations
if not isinstance(rest, str):
logging.warning(f"rest is not a string, type: {type(rest)}, value: {rest}, converting to string")
rest = str(rest)
# Check if it already has name() option
if not re.search(r'\bname\s*\(', rest, re.IGNORECASE):
# Add automatic unique name
graph_counter += 1
graph_name = f"graph{graph_counter}"
# Add name option - if there's a comma, add after it; otherwise add with comma
if ',' in rest:
# Insert name option right after the first comma
# Ensure rest is definitely a string before re.sub
rest = str(rest)
rest = re.sub(r',', f', name({graph_name}, replace)', rest, 1)
else:
# No comma yet, add it
rest = rest.rstrip() + f', name({graph_name}, replace)'
modified_content += f"{indent}{graph_cmd} {rest}\n"
logging.debug(f"Auto-named graph: {graph_name}")
continue
# Keep line as-is (including graph export commands)
modified_content += f"{line}\n"
logging.info(f"Found and commented out {log_commands_found} log commands in the do file")
if cls_commands_found > 0:
logging.info(f"Found and commented out {cls_commands_found} cls commands in the do file")
if graph_counter > 0:
logging.info(f"Auto-named {graph_counter} graph commands")
# Save the modified content to a temporary file
with tempfile.NamedTemporaryFile(
suffix='.do', delete=False, mode='w', encoding='utf-8'
) as temp_do:
# First close any existing log files
temp_do.write(f"capture log close _all\n")
# Clean up Stata session state to prevent pollution from interrupted executions
# Drop all temporary programs (especially loop programs like 1while, 2while, etc.)
temp_do.write(f"capture program drop _all\n")
# Clear all macros to prevent conflicts
temp_do.write(f"capture macro drop _all\n")
# Change working directory to the .do file's directory
# This ensures the .do file executes in its workspace (relative paths work correctly)
# The log file uses an absolute path, so it's saved to the configured location
temp_do.write(f"cd \"{do_file_dir}\"\n")
# Note: _gr_list on is enabled externally before .do file execution
# Note: Graph names are auto-injected above into modified_content
# Then add our own log command with absolute path
temp_do.write(f"log using \"{custom_log_file}\", replace text\n")
temp_do.write(modified_content)
temp_do.write(f"\ncapture log close _all\n") # Ensure all logs are closed at the end
# Note: We intentionally do NOT disable _gr_list so graphs persist for detection
modified_do_file = temp_do.name
logging.info(f"Created modified do file at {modified_do_file}")
except Exception as e:
import traceback
error_msg = f"Error processing do file: {str(e)}"
logging.error(error_msg)
logging.error(f"Traceback: {traceback.format_exc()}")
# Include line number and more details
tb = traceback.extract_tb(e.__traceback__)
if tb:
last_frame = tb[-1]
error_msg += f"\n at line {last_frame.lineno} in {last_frame.name}"
return error_msg
# Prepare command entry for history
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
command_entry = f"[{timestamp}] do '{file_path}'"
# Create initial result to update the user
initial_result = f">>> {command_entry}\nExecuting Stata do file with timeout: {MAX_TIMEOUT} seconds ({MAX_TIMEOUT/60:.1f} minutes)...\n"
# Need to define result variable here so it's accessible in all code paths
result = initial_result
# Create a properly escaped file path for Stata
if platform.system() == "Windows":
# On Windows, escape backslashes and quotes
stata_path = modified_do_file.replace('"', '\\"')
# Ensure the path is properly quoted for Windows
do_command = f'do "{stata_path}"'
else:
# On Unix systems (macOS/Linux), use double quotes for better compatibility
# Double quotes work more reliably across systems
do_command = f'do "{modified_do_file}"'
# Run the command in background with timeout
try:
# Execute the Stata command
logging.info(f"Running modified do file: {do_command}")
# Set up for PyStata execution
if has_stata and stata_available:
# Enable graph listing for this do file execution using low-level API
try:
from pystata.config import stlib, get_encode_str
stlib.StataSO_Execute(get_encode_str("qui _gr_list on"), False)
logging.debug("Enabled graph listing for do file")
except Exception as e:
logging.warning(f"Could not enable graph listing: {str(e)}")
# Record start time for timeout tracking
start_time = time.time()
last_update_time = start_time
update_interval = 60 # Update every 60 seconds (1 minute) initially
# Initialize log tracking
log_file_exists = False
last_log_size = 0
last_reported_lines = 0
# Execute command via PyStata in separate thread to allow polling
stata_thread = None
stata_error = None
def run_stata_thread():
try:
# Make sure to properly quote the path - this is the key fix
# Use inline=False because inline=True calls _gr_list off!
if platform.system() == "Windows":
# Make sure Windows paths are properly escaped
globals()['stata'].run(do_command, echo=False, inline=False)
else:
# On macOS/Linux, double-check the quoting - adding extra safety
if not (do_command.startswith('do "') or do_command.startswith("do '")):
do_command_fixed = f'do "{stata_path}"'
globals()['stata'].run(do_command_fixed, echo=False, inline=False)
else:
globals()['stata'].run(do_command, echo=False, inline=False)
except Exception as e:
nonlocal stata_error
stata_error = str(e)
import threading
stata_thread = threading.Thread(target=run_stata_thread)
stata_thread.daemon = True
stata_thread.start()
# Poll for progress while command is running
while stata_thread.is_alive():
# Check for timeout
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > MAX_TIMEOUT:
logging.warning(f"Execution timed out after {MAX_TIMEOUT} seconds")
result += f"\n*** TIMEOUT: Execution exceeded {MAX_TIMEOUT} seconds ({MAX_TIMEOUT/60:.1f} minutes) ***\n"
# Force terminate Stata operation with increasing severity
termination_successful = False
try:
# ATTEMPT 1: Send Stata break command
logging.warning(f"TIMEOUT - Attempt 1: Sending Stata break command")
try:
globals()['stata'].run("break", echo=False)
time.sleep(0.5) # Give it a moment
if not stata_thread.is_alive():
termination_successful = True
logging.warning("Thread terminated via Stata break command")
except Exception as e:
logging.warning(f"Stata break command failed: {str(e)}")
# ATTEMPT 2: Try to forcibly raise an exception in the thread
if not termination_successful and hasattr(stata_thread, "_stop"):
logging.warning(f"TIMEOUT - Attempt 2: Forcing thread stop")
try:
# This is a more aggressive approach
# The _stop method is not officially supported but often works
stata_thread._stop()
time.sleep(0.5) # Give it a moment
if not stata_thread.is_alive():
termination_successful = True
logging.warning("Thread terminated via thread._stop")
except Exception as e:
logging.warning(f"Thread stop failed: {str(e)}")
# ATTEMPT 3: Try to find and kill the Stata process (last resort)
if not termination_successful:
logging.warning(f"TIMEOUT - Attempt 3: Looking for Stata process to terminate")
try:
# Find any Stata processes
if platform.system() == "Windows":
# Windows approach
subprocess.run(["taskkill", "/F", "/IM", "stata*.exe"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
else:
# macOS/Linux approach
subprocess.run(["pkill", "-f", "stata"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
logging.warning("Sent kill signal to Stata processes")
except Exception as e:
logging.error(f"Process kill failed: {str(e)}")
except Exception as term_error:
logging.error(f"Error during forced termination: {str(term_error)}")
# Set a flag indicating timeout regardless of termination success
stata_error = f"Operation timed out after {MAX_TIMEOUT} seconds"
logging.warning(f"Setting timeout error: {stata_error}")
break
# Check if it's time for an update
if current_time - last_update_time >= update_interval:
# IMPORTANT: Log progress frequently to keep SSE connection alive for long-running scripts
logging.info(f"⏱️ Execution in progress: {elapsed_time:.0f}s elapsed ({elapsed_time/60:.1f} minutes) of {MAX_TIMEOUT}s timeout")
# Check if log file exists and has been updated
if os.path.exists(custom_log_file):
log_file_exists = True
# Check log file size
current_log_size = os.path.getsize(custom_log_file)
# If log has grown, report progress
if current_log_size > last_log_size:
try:
with open(custom_log_file, 'r', encoding='utf-8', errors='replace') as log:
log_content = log.read()
lines = log_content.splitlines()
# Report only new lines since last update
if last_reported_lines < len(lines):
new_lines = lines[last_reported_lines:]
# Only report meaningful lines (skip empty lines and headers)
meaningful_lines = [line for line in new_lines if line.strip() and not line.startswith('-')]
# If we have meaningful content, add it to result
if meaningful_lines:
progress_update = f"\n*** Progress update ({elapsed_time:.0f} seconds) ***\n"
progress_update += "\n".join(meaningful_lines[-10:]) # Show last 10 lines
result += progress_update
# Also log the progress for SSE keep-alive
logging.info(f"📊 Progress: Log file grew to {current_log_size} bytes, {len(meaningful_lines)} new meaningful lines")
last_reported_lines = len(lines)
except Exception as e:
logging.warning(f"Error reading log for progress update: {str(e)}")
last_log_size = current_log_size
last_update_time = current_time
# Adaptive polling - keep interval at 60 seconds to maintain SSE connection
# This ensures we send at least one log message every 60 seconds (1 minute) to keep the connection alive
if elapsed_time > 600: # After 10 minutes
update_interval = 60 # Check every 60 seconds (1 minute)
elif elapsed_time > 300: # After 5 minutes
update_interval = 60 # Check every 60 seconds (1 minute)
elif elapsed_time > 60: # After 1 minute
update_interval = 60 # Check every 60 seconds (1 minute)
# Sleep briefly to avoid consuming too much CPU
time.sleep(0.5)
# Thread completed or timed out
if stata_error:
error_msg = f"Error executing Stata command: {stata_error}"
logging.error(error_msg)
result += f"\n*** ERROR: {stata_error} ***\n"
# Add command to history and return
command_history.append({"command": command_entry, "result": result})
return result
# Read final log output
if os.path.exists(custom_log_file):
try:
with open(custom_log_file, 'r', encoding='utf-8', errors='replace') as log:
log_content = log.read()
# Clean up log content - remove headers and Stata startup info
lines = log_content.splitlines()
result_lines = []
# Skip Stata header if present (search for the separator line)
start_index = 0
for i, line in enumerate(lines):
if '-------------' in line and i < 20: # Look in first 20 lines
start_index = i + 1
break
# Process the content
for i in range(start_index, len(lines)):
# Ensure line is a string (defensive programming)
line = str(lines[i]) if lines[i] is not None else ""
line = line.rstrip()
# Skip empty lines at beginning or redundant empty lines
if not line.strip() and (not result_lines or not result_lines[-1].strip()):
continue
# Clean up SMCL formatting if present
if '{' in line:
line = re.sub(r'\{[^}]*\}', '', line) # Remove {...} codes
result_lines.append(line)
# Add completion message with final log content
completion_msg = f"\n*** Execution completed in {time.time() - start_time:.1f} seconds ***\n"
completion_msg += "Final output:\n"
completion_msg += "\n".join(result_lines)
# Replace the result with a clean summary
result = f">>> {command_entry}\n{completion_msg}"
# Only detect and export graphs if called from VS Code extension (not from LLM/MCP)
if auto_name_graphs:
# Detect and export any graphs created by the do file
# Using interactive mode which should work because inline=True keeps graphs in memory
try:
logging.debug("Attempting to detect graphs from do file (interactive mode)...")
graphs = display_graphs_interactive(graph_format='png', width=800, height=600)
logging.debug(f"Graph detection returned: {graphs}")
if graphs:
graph_info = "\n\n" + "="*60 + "\n"
graph_info += f"GRAPHS DETECTED: {len(graphs)} graph(s) created\n"
graph_info += "="*60 + "\n"
for graph in graphs:
# Include command if available, using special format for JavaScript parsing
if 'command' in graph and graph['command']:
graph_info += f" • {graph['name']}: {graph['path']} [CMD: {graph['command']}]\n"
else:
graph_info += f" • {graph['name']}: {graph['path']}\n"
result += graph_info
logging.info(f"Detected {len(graphs)} graphs from do file: {[g['name'] for g in graphs]}")
else:
logging.debug("No graphs detected from do file")
except Exception as e:
logging.warning(f"Error detecting graphs: {str(e)}")
logging.debug(f"Graph detection error details: {traceback.format_exc()}")
# Log the final file location
result += f"\n\nLog file saved to: {custom_log_file}"
except Exception as e:
logging.error(f"Error reading final log: {str(e)}")
result += f"\n*** WARNING: Error reading final log: {str(e)} ***\n"
else:
logging.warning(f"Log file not found after execution: {custom_log_file}")
result += f"\n*** WARNING: Log file not found after execution ***\n"
# Try to get a status update from Stata
try:
status = run_stata_command("display _rc", clear_history=False)
result += f"\nStata return code: {status}\n"
except Exception as e:
pass
else:
# Stata not available
error_msg = "Stata is not available. Please check if Stata is installed and configured correctly."
logging.error(error_msg)
result = f">>> {command_entry}\n{error_msg}"
except Exception as e:
error_msg = f"Error running do file: {str(e)}"
logging.error(error_msg)
result = f">>> {command_entry}\n{error_msg}"
# Add to command history and return result
command_history.append({"command": command_entry, "result": result})
return result
except Exception as e:
error_msg = f"Error in run_stata_file: {str(e)}"
logging.error(error_msg)
return error_msg
# Function to kill any process using the specified port
def kill_process_on_port(port):
"""Kill any process that is currently using the specified port"""
try:
if platform.system() == "Windows":
# Windows command to find and kill process on port
find_cmd = f"netstat -ano | findstr :{port}"
try:
result = subprocess.check_output(find_cmd, shell=True).decode()
if result:
# Extract PID from the result
for line in result.strip().split('\n'):
if f":{port}" in line and "LISTENING" in line:
pid = line.strip().split()[-1]
logging.info(f"Found process with PID {pid} using port {port}")
# Kill the process
kill_cmd = f"taskkill /F /PID {pid}"
subprocess.check_output(kill_cmd, shell=True)
logging.info(f"Killed process with PID {pid}")
break
else:
logging.info(f"No process found using port {port}")
except subprocess.CalledProcessError:
# No process found using the port (findstr returns 1 when no matches found)
logging.info(f"No process found using port {port}")
else:
# macOS/Linux command to find and kill process on port
try:
# Find the process IDs using the port
find_cmd = f"lsof -i :{port} -t"
result = subprocess.check_output(find_cmd, shell=True).decode().strip()
if result:
# Handle multiple PIDs (one per line)
pids = result.split('\n')
for pid in pids:
pid = pid.strip()
if pid:
logging.info(f"Found process with PID {pid} using port {port}")
# Kill the process
try:
os.kill(int(pid), signal.SIGKILL) # Use SIGKILL for more forceful termination
logging.info(f"Killed process with PID {pid}")
except Exception as kill_error:
logging.warning(f"Error killing process with PID {pid}: {str(kill_error)}")
# Wait a moment to ensure the port is released
time.sleep(1)
else:
logging.info(f"No process found using port {port}")
except subprocess.CalledProcessError:
# No process found using the port
logging.info(f"No process found using port {port}")
except Exception as e:
logging.warning(f"Error killing process on port {port}: {str(e)}")
# Double-check if port is still in use
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(('localhost', port))
if result == 0:
logging.warning(f"Port {port} is still in use after attempting to kill processes")
logging.warning(f"Please manually kill any processes using port {port} or use a different port")
else:
logging.info(f"Port {port} is now available")
except Exception as socket_error:
logging.warning(f"Error checking port availability: {str(socket_error)}")
# Function to find an available port
def find_available_port(start_port, max_attempts=10):
"""Find an available port starting from start_port"""
for port_offset in range(max_attempts):
port = start_port + port_offset
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(('localhost', port))
if result != 0: # Port is available
logging.info(f"Found available port: {port}")
return port
except Exception as e:
logging.warning(f"Error checking port {port}: {str(e)}")
# If we get here, we couldn't find an available port
logging.warning(f"Could not find an available port after {max_attempts} attempts")
return None
# Parameter models for the MCP tools
class RunSelectionParams(BaseModel):
selection: str = Field(..., description="The Stata code to execute")
class RunFileParams(BaseModel):
file_path: str = Field(..., description="The full path to the .do file")
timeout: int = Field(600, description="Timeout in seconds (default: 600 seconds / 10 minutes)")
# Define Legacy VS Code Extension Support
class ToolRequest(BaseModel):
tool: str
parameters: Dict[str, Any]
class ToolResponse(BaseModel):
status: str
result: Optional[str] = None
message: Optional[str] = None
# Define lifespan context manager for startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle application lifespan events"""
# Startup: Log startup
logging.info("FastAPI application starting up")
# Start HTTP session manager if it exists
if hasattr(app.state, '_http_session_manager_starter'):
logging.debug("Calling HTTP session manager startup handler")
await app.state._http_session_manager_starter()
yield # Application runs
# Shutdown: Stop HTTP session manager if it exists
if hasattr(app.state, '_http_session_manager_stopper'):
logging.debug("Calling HTTP session manager shutdown handler")
await app.state._http_session_manager_stopper()
# Cleanup if needed
logging.info("FastAPI application shutting down")
# Create the FastAPI app with lifespan handler
app = FastAPI(
title=SERVER_NAME,
version=SERVER_VERSION,
description="Stata MCP Server - Exposes Stata functionality to AI models via MCP protocol",
lifespan=lifespan
)
# Define regular FastAPI routes for Stata functions
@app.post("/run_selection", operation_id="stata_run_selection", response_class=Response)
async def stata_run_selection_endpoint(selection: str) -> Response:
"""Run selected Stata code and return the output"""
logging.info(f"Running selection: {selection}")
result = run_stata_selection(selection)
# Format output for better display - replace escaped newlines with actual newlines
formatted_result = result.replace("\\n", "\n")
return Response(content=formatted_result, media_type="text/plain")
async def stata_run_file_stream(file_path: str, timeout: int = 600):
"""Async generator that runs Stata file and yields SSE progress events
Args:
file_path: Path to the .do file
timeout: Timeout in seconds
Yields:
SSE formatted events with progress updates
"""
import threading
import queue
# Queue to communicate between threads
progress_queue = queue.Queue()
result_queue = queue.Queue()
def run_with_progress():
"""Run Stata file in thread, sending progress to queue"""
try:
# Run the file and collect result
result = run_stata_file(file_path, timeout=timeout)
result_queue.put(('success', result))
except Exception as e:
result_queue.put(('error', str(e)))
# Start execution thread
thread = threading.Thread(target=run_with_progress, daemon=True)
thread.start()
# Yield initial event
yield f"data: Starting execution of {os.path.basename(file_path)}...\n\n"
start_time = time.time()
last_check = start_time
check_interval = 2.0 # Check every 2 seconds for responsive streaming
# Monitor progress
while thread.is_alive():
current_time = time.time()
elapsed = current_time - start_time
# Check if it's time for an update
if current_time - last_check >= check_interval:
# Yield progress event
yield f"data: Executing... {elapsed:.1f}s elapsed\n\n"
last_check = current_time
# Sleep briefly to avoid busy waiting
await asyncio.sleep(0.1)
# Check if execution exceeded timeout
if elapsed > timeout:
yield f"data: ERROR: Execution timed out after {timeout}s\n\n"
break
# Get final result
try:
status, result = result_queue.get(timeout=1.0)
if status == 'error':
yield f"data: ERROR: {result}\n\n"
else:
# Format and send final output
formatted_result = result.replace("\\n", "\n")
# Split into chunks to avoid overwhelming SSE
lines = formatted_result.split('\n')
for i in range(0, len(lines), 10):
chunk = '\n'.join(lines[i:i+10])
# Escape newlines in SSE data field
escaped_chunk = chunk.replace('\n', '\\n')
yield f"data: {escaped_chunk}\n\n"
await asyncio.sleep(0.05) # Small delay between chunks
yield "data: *** Execution completed ***\n\n"
except queue.Empty:
yield "data: ERROR: Failed to get execution result\n\n"
@app.get("/run_file", operation_id="stata_run_file", response_class=Response)
async def stata_run_file_endpoint(
file_path: str,
timeout: int = 600
) -> Response:
"""Run a Stata .do file and return the output (MCP-compatible endpoint)
Args:
file_path: Path to the .do file
timeout: Timeout in seconds (default: 600 seconds / 10 minutes)
Returns:
Response with plain text output
"""
# Ensure timeout is a valid integer
try:
timeout = int(timeout)
if timeout <= 0:
logging.warning(f"Invalid timeout value: {timeout}, using default 600")
timeout = 600
except (ValueError, TypeError):
logging.warning(f"Non-integer timeout value: {timeout}, using default 600")
timeout = 600
logging.info(f"Running file: {file_path} with timeout {timeout} seconds ({timeout/60:.1f} minutes)")
result = await asyncio.to_thread(run_stata_file, file_path, timeout=timeout)
# Format output for better display - replace escaped newlines with actual newlines
formatted_result = result.replace("\\n", "\n")
# Log the output (truncated) for debugging
logging.debug(f"Run file output (first 100 chars): {formatted_result[:100]}...")
return Response(content=formatted_result, media_type="text/plain")
@app.get("/run_file/stream")
async def stata_run_file_stream_endpoint(
file_path: str,
timeout: int = 600
):
"""Run a Stata .do file and stream the output via Server-Sent Events (SSE)
This is a separate endpoint for HTTP clients that want real-time streaming updates.
For MCP clients, use the regular /run_file endpoint.
Args:
file_path: Path to the .do file
timeout: Timeout in seconds (default: 600 seconds / 10 minutes)
Returns:
StreamingResponse with text/event-stream content type
"""
# Ensure timeout is a valid integer
try:
timeout = int(timeout)
if timeout <= 0:
logging.warning(f"Invalid timeout value: {timeout}, using default 600")
timeout = 600
except (ValueError, TypeError):
logging.warning(f"Non-integer timeout value: {timeout}, using default 600")
timeout = 600
logging.info(f"[STREAM] Running file: {file_path} with timeout {timeout} seconds ({timeout/60:.1f} minutes)")
return StreamingResponse(
stata_run_file_stream(file_path, timeout),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
# MCP server will be initialized in main() after args are parsed
# Add FastAPI endpoint for legacy VS Code extension
@app.post("/v1/tools", include_in_schema=False)
async def call_tool(request: ToolRequest) -> ToolResponse:
try:
# Map VS Code extension tool names to MCP tool names
tool_name_map = {
"run_selection": "stata_run_selection",
"run_file": "stata_run_file"
}
# Get the actual tool name
mcp_tool_name = tool_name_map.get(request.tool, request.tool)
# Log the request
logging.info(f"REST API request for tool: {request.tool} -> {mcp_tool_name}")
# Check if the tool exists
if mcp_tool_name not in ["stata_run_selection", "stata_run_file"]:
return ToolResponse(
status="error",
message=f"Unknown tool: {request.tool}"
)
# Execute the appropriate function
if mcp_tool_name == "stata_run_selection":
if "selection" not in request.parameters:
return ToolResponse(
status="error",
message="Missing required parameter: selection"
)
# Get optional working_dir parameter
working_dir = request.parameters.get("working_dir", None)
# Enable auto_detect_graphs for VS Code extension calls
result = run_stata_selection(request.parameters["selection"], working_dir=working_dir, auto_detect_graphs=True)
# Format output for better display
result = result.replace("\\n", "\n")
elif mcp_tool_name == "stata_run_file":
if "file_path" not in request.parameters:
return ToolResponse(
status="error",
message="Missing required parameter: file_path"
)
# Get the file path from the parameters
file_path = request.parameters["file_path"]
# Get timeout parameter if provided, otherwise use default (10 minutes)
timeout = request.parameters.get("timeout", 600)
try:
timeout = int(timeout) # Ensure it's an integer
if timeout <= 0:
logging.warning(f"Invalid timeout value: {timeout}, using default 600")
timeout = 600
except (ValueError, TypeError):
logging.warning(f"Non-integer timeout value: {timeout}, using default 600")
timeout = 600
logging.info(f"MCP run_file request for: {file_path} with timeout {timeout} seconds ({timeout/60:.1f} minutes)")
# Normalize the path for cross-platform compatibility
file_path = os.path.normpath(file_path)
# On Windows, convert forward slashes to backslashes if needed
if platform.system() == "Windows" and '/' in file_path:
file_path = file_path.replace('/', '\\')
# Run the file through the run_stata_file function with timeout
# Enable auto_name_graphs for VS Code extension calls
result = run_stata_file(file_path, timeout=timeout, auto_name_graphs=True)
# Format output for better display
result = result.replace("\\n", "\n")
# Log the output length for debugging
logging.debug(f"MCP run_file output length: {len(result)}")
# If no output was captured, log a warning
if "Command executed but" in result and "output not captured" in result:
logging.warning(f"No output captured for file: {file_path}")
# If file not found error, make the message more helpful
if "File not found" in result:
# Add help text explaining common issues with Windows paths
if platform.system() == "Windows":
result += "\n\nCommon Windows path issues:\n"
result += "1. Make sure the file path uses correct separators (use \\ instead of /)\n"
result += "2. Check if the file exists in the specified location\n"
result += "3. If using relative paths, the current working directory is: " + os.getcwd()
# Return successful response
return ToolResponse(
status="success",
result=result
)
except Exception as e:
logging.error(f"Error handling tool request: {str(e)}")
return ToolResponse(
status="error",
message=f"Server error: {str(e)}"
)
# Simplified health check endpoint - only report server status without executing Stata commands
@app.get("/health", include_in_schema=False)
async def health_check():
return {
"status": "ok",
"service": SERVER_NAME,
"version": SERVER_VERSION,
"stata_available": stata_available
}
# Endpoint to serve graph images
# Hidden from OpenAPI schema so it won't be exposed to LLMs via MCP
@app.get("/graphs/{graph_name}", include_in_schema=False)
async def get_graph(graph_name: str):
"""Serve a graph image file"""
try:
# Construct the path to the graph file
if extension_path:
graphs_dir = os.path.join(extension_path, 'graphs')
else:
graphs_dir = os.path.join(tempfile.gettempdir(), 'stata_mcp_graphs')
# Support both with and without .png extension
if not graph_name.endswith('.png'):
graph_name = f"{graph_name}.png"
graph_path = os.path.join(graphs_dir, graph_name)
# Check if file exists
if not os.path.exists(graph_path):
return Response(
content=f"Graph not found: {graph_name}",
status_code=404,
media_type="text/plain"
)
# Read and return the image file
with open(graph_path, 'rb') as f:
image_data = f.read()
return Response(content=image_data, media_type="image/png")
except Exception as e:
logging.error(f"Error serving graph {graph_name}: {str(e)}")
return Response(
content=f"Error serving graph: {str(e)}",
status_code=500
)
@app.post("/clear_history", include_in_schema=False)
async def clear_history_endpoint():
"""Clear the command history"""
global command_history
try:
count = len(command_history)
command_history = []
logging.info(f"Cleared command history ({count} items)")
return {"status": "success", "message": f"Cleared {count} items from history"}
except Exception as e:
logging.error(f"Error clearing history: {str(e)}")
return {"status": "error", "message": str(e)}
@app.get("/view_data", include_in_schema=False)
async def view_data_endpoint(if_condition: str = None):
"""Get current Stata data as a pandas DataFrame and return as JSON
Args:
if_condition: Optional Stata if condition (e.g., "price > 5000 & mpg < 30")
"""
global stata_available, stata
try:
if not stata_available or stata is None:
logging.error("Stata is not available")
return Response(
content=json.dumps({
"status": "error",
"message": "Stata is not initialized"
}),
media_type="application/json",
status_code=500
)
# Apply if condition if provided
if if_condition:
logging.info(f"Applying filter: if {if_condition}")
try:
# Get full data first
df = stata.pdataframe_from_data()
if df is None or df.empty:
raise Exception("No data currently loaded in Stata")
# Use Stata to create a filter marker variable
try:
import sfi
# First, check if variable already exists and drop it
try:
stata.run("capture drop _filter_marker", inline=False, echo=False)
except:
pass
# Generate marker for rows that match the condition
gen_cmd = f"quietly generate byte _filter_marker = ({if_condition})"
logging.debug(f"Running filter command: {gen_cmd}")
try:
stata.run(gen_cmd, inline=False, echo=False)
logging.debug(f"Generate command executed successfully")
except SystemError as se:
logging.error(f"SystemError in generate command: {str(se)}")
raise Exception(f"Invalid condition syntax: {if_condition}")
except Exception as e:
logging.error(f"Exception in generate command: {type(e).__name__}: {str(e)}")
raise Exception(f"Error creating filter: {str(e)}")
# Get the marker variable values using SFI
n_obs = sfi.Data.getObsTotal()
logging.debug(f"Total observations: {n_obs}")
# Get the variable index for _filter_marker
var_index = sfi.Data.getVarIndex('_filter_marker')
logging.debug(f"Filter marker variable index: {var_index}")
if var_index < 0:
raise Exception("Failed to create filter marker variable")
# Read the filter values for all observations
# NOTE: sfi.Data.get() returns nested lists like [[1]] or [[0]]
# We need to extract the actual value
filter_mask = []
for i in range(n_obs):
val = sfi.Data.get('_filter_marker', i)
# Extract the actual value from nested list structure
if isinstance(val, list) and len(val) > 0:
if isinstance(val[0], list) and len(val[0]) > 0:
actual_val = val[0][0]
else:
actual_val = val[0]
else:
actual_val = val
filter_mask.append(actual_val == 1)
# Debug: Log first few values and count
true_count = sum(filter_mask)
if n_obs > 0:
sample_vals = [sfi.Data.get('_filter_marker', i) for i in range(min(5, n_obs))]
logging.debug(f"First 5 marker values (raw): {sample_vals}")
logging.debug(f"Filter mask true count: {true_count} out of {n_obs}")
# Drop the temporary marker
stata.run("quietly drop _filter_marker", inline=False, echo=False)
# Filter the DataFrame using the mask
df = df[filter_mask].reset_index(drop=True)
logging.info(f"Filtered data: {len(df)} rows match condition (out of {n_obs} total)")
except Exception as stata_err:
# Clean up if there's an error
try:
stata.run("capture drop _filter_marker", inline=False, echo=False)
except:
pass
logging.error(f"Filter processing error: {type(stata_err).__name__}: {str(stata_err)}")
raise Exception(f"{str(stata_err)}")
except Exception as filter_err:
logging.error(f"Filter error: {str(filter_err)}")
return Response(
content=json.dumps({
"status": "error",
"message": f"Filter error: {str(filter_err)}"
}),
media_type="application/json",
status_code=400
)
else:
# Get data as pandas DataFrame without filtering
logging.info("Getting data from Stata using pdataframe_from_data()")
df = stata.pdataframe_from_data()
# Check if data is empty
if df is None or df.empty:
logging.info("No data currently loaded in Stata")
return Response(
content=json.dumps({
"status": "success",
"message": "No data currently loaded",
"data": [],
"columns": [],
"rows": 0
}),
media_type="application/json"
)
# Get data info
rows, cols = df.shape
logging.info(f"Data retrieved: {rows} observations, {cols} variables")
# Convert DataFrame to JSON format
# Replace NaN with None for proper JSON serialization
df_clean = df.replace({float('nan'): None})
# Convert to list of lists for better performance
data_values = df_clean.values.tolist()
column_names = df_clean.columns.tolist()
# Get data types for each column
dtypes = {col: str(df[col].dtype) for col in df.columns}
return Response(
content=json.dumps({
"status": "success",
"data": data_values,
"columns": column_names,
"dtypes": dtypes,
"rows": int(rows),
"index": df.index.tolist()
}),
media_type="application/json"
)
except Exception as e:
error_msg = f"Error getting data: {str(e)}"
logging.error(error_msg)
logging.error(traceback.format_exc())
return Response(
content=json.dumps({
"status": "error",
"message": error_msg
}),
media_type="application/json",
status_code=500
)
@app.get("/interactive", include_in_schema=False)
async def interactive_window(file: str = None, code: str = None):
"""Serve the interactive Stata window as a full webpage"""
# If a file path or code is provided, we'll auto-execute it on page load
auto_run_file = file if file else ""
auto_run_code = code if code else ""
# Use regular string and insert the file path separately to avoid f-string conflicts
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Stata Interactive Window</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #1e1e1e;
color: #d4d4d4;
height: 100vh;
display: flex;
flex-direction: column;
}
.main-container {
display: flex;
flex: 1;
overflow: hidden;
}
.left-panel {
flex: 1;
display: flex;
flex-direction: column;
border-right: 1px solid #3e3e42;
overflow: hidden;
}
.output-section {
flex: 1;
overflow-y: auto;
padding: 20px;
}
.output-cell {
border-left: 3px solid #007acc;
padding-left: 15px;
margin-bottom: 20px;
background: #252526;
padding: 15px;
border-radius: 4px;
}
.command-line {
color: #4fc1ff;
font-weight: bold;
margin-bottom: 10px;
font-family: 'Consolas', 'Monaco', monospace;
}
.command-output {
font-family: 'Consolas', 'Monaco', monospace;
white-space: pre-wrap;
font-size: 13px;
line-height: 1.5;
}
.input-section {
border-top: 1px solid #3e3e42;
padding: 20px;
background: #252526;
}
.input-container {
display: flex;
gap: 10px;
}
#command-input {
flex: 1;
background: #3c3c3c;
border: 1px solid #6c6c6c;
color: #d4d4d4;
padding: 12px 15px;
font-family: 'Consolas', 'Monaco', monospace;
font-size: 14px;
border-radius: 4px;
}
#command-input:focus {
outline: none;
border-color: #007acc;
}
#run-button {
background: #0e639c;
color: white;
border: none;
padding: 12px 30px;
font-weight: 600;
cursor: pointer;
border-radius: 4px;
transition: background 0.2s;
}
#run-button:hover {
background: #1177bb;
}
#run-button:disabled {
background: #555;
cursor: not-allowed;
}
.right-panel {
width: 40%;
overflow-y: auto;
padding: 20px;
background: #1e1e1e;
}
.graphs-title {
font-size: 20px;
font-weight: 600;
margin-bottom: 20px;
color: #ffffff;
}
.graph-card {
background: #252526;
border: 1px solid #3e3e42;
border-radius: 8px;
padding: 20px;
margin-bottom: 20px;
}
.graph-card h3 {
margin-bottom: 15px;
color: #ffffff;
}
.graph-card img {
width: 100%;
height: auto;
border-radius: 4px;
}
.error {
background: #5a1d1d;
border-left: 3px solid #f48771;
padding: 15px;
border-radius: 4px;
margin-bottom: 20px;
}
.hint {
color: #858585;
font-size: 12px;
margin-top: 8px;
}
.no-graphs {
color: #858585;
font-style: italic;
text-align: center;
padding: 40px;
}
</style>
</head>
<body>
<div class="main-container">
<div class="left-panel">
<div class="output-section" id="output-container"></div>
<div class="input-section">
<div class="input-container">
<input type="text" id="command-input"
placeholder="Enter Stata command (e.g., summarize, scatter y x, regress y x)..."
autocomplete="off" />
<button id="run-button">Run</button>
</div>
<div class="hint">Press Enter to execute • Ctrl+L to clear output</div>
</div>
</div>
<div class="right-panel">
<div class="graphs-title">Graphs</div>
<div id="graphs-container">
<div class="no-graphs">No graphs yet. Run commands to generate graphs.</div>
</div>
</div>
</div>
<script>
const commandInput = document.getElementById('command-input');
const runButton = document.getElementById('run-button');
const outputContainer = document.getElementById('output-container');
const graphsContainer = document.getElementById('graphs-container');
runButton.addEventListener('click', executeCommand);
commandInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') executeCommand();
});
document.addEventListener('keydown', async (e) => {
if (e.ctrlKey && e.key === 'l') {
e.preventDefault();
// Clear text output visually
outputContainer.innerHTML = '';
// Clear graphs visually
graphsContainer.innerHTML = '<div class="no-graphs">No graphs yet. Run commands to generate graphs.</div>';
// Clear server-side command history so it doesn't come back
try {
const response = await fetch('/clear_history', {
method: 'POST',
headers: { 'Content-Type': 'application/json' }
});
const data = await response.json();
console.log('History cleared:', data.message);
} catch (err) {
console.error('Error clearing history:', err);
}
}
});
async function executeCommand() {
const command = commandInput.value.trim();
if (!command) return;
runButton.disabled = true;
runButton.textContent = 'Running...';
try {
const response = await fetch('/v1/tools', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tool: 'run_selection',
parameters: { selection: command }
})
});
const data = await response.json();
if (data.status === 'success') {
addOutputCell(command, data.result);
updateGraphs(data.result);
} else {
addError(data.message || 'Command failed');
}
} catch (error) {
addError(error.message);
}
runButton.disabled = false;
runButton.textContent = 'Run';
commandInput.value = '';
commandInput.focus();
}
function addOutputCell(command, output) {
const cell = document.createElement('div');
cell.className = 'output-cell';
cell.innerHTML = `
<div class="command-line">> ${escapeHtml(command)}</div>
<div class="command-output">${escapeHtml(output)}</div>
`;
outputContainer.appendChild(cell);
outputContainer.scrollTop = outputContainer.scrollHeight;
}
function addError(message) {
const error = document.createElement('div');
error.className = 'error';
error.textContent = 'Error: ' + message;
outputContainer.appendChild(error);
outputContainer.scrollTop = outputContainer.scrollHeight;
}
function updateGraphs(output) {
// Updated regex to capture optional command: • name: path [CMD: command]
// Use [^\\n\\[] to stop at newlines or opening bracket
const graphRegex = /• ([^:]+): ([^\\n\\[]+)(?:\\[CMD: ([^\\]]+)\\])?/g;
const matches = [...output.matchAll(graphRegex)];
if (matches.length > 0) {
// Remove "no graphs" message if it exists
const noGraphsMsg = graphsContainer.querySelector('.no-graphs');
if (noGraphsMsg) {
graphsContainer.innerHTML = '';
}
// Add or update each graph
matches.forEach(match => {
const name = match[1].trim();
const path = match[2].trim();
const command = match[3] ? match[3].trim() : null;
// Check if graph already exists
const existingGraph = graphsContainer.querySelector(`[data-graph-name="${name}"]`);
if (existingGraph) {
// Update existing graph - force reload by adding timestamp
updateGraph(existingGraph, name, `/graphs/${encodeURIComponent(name)}`, command);
} else {
// Add new graph
addGraph(name, `/graphs/${encodeURIComponent(name)}`, command);
}
});
}
}
function updateGraph(existingCard, name, url, command) {
// Force reload by adding timestamp to bypass cache
const timestamp = new Date().getTime();
const urlWithTimestamp = `${url}?t=${timestamp}`;
const commandHtml = command ? `<div style="color: #858585; font-size: 12px; margin-bottom: 8px; font-family: 'Courier New', monospace; background: #1a1a1a; padding: 6px; border-radius: 3px; border-left: 3px solid #4a9eff;">$ ${escapeHtml(command)}</div>` : '';
existingCard.innerHTML = `
<h3>${escapeHtml(name)}</h3>
${commandHtml}
<img src="${urlWithTimestamp}" alt="${escapeHtml(name)}"
onerror="this.parentElement.innerHTML='<p style=\\'color:#f48771\\'>Failed to load graph</p>'">
`;
}
function addGraph(name, url, command) {
const card = document.createElement('div');
card.className = 'graph-card';
card.setAttribute('data-graph-name', name);
const commandHtml = command ? `<div style="color: #858585; font-size: 12px; margin-bottom: 8px; font-family: 'Courier New', monospace; background: #1a1a1a; padding: 6px; border-radius: 3px; border-left: 3px solid #4a9eff;">$ ${escapeHtml(command)}</div>` : '';
card.innerHTML = `
<h3>${escapeHtml(name)}</h3>
${commandHtml}
<img src="${url}" alt="${escapeHtml(name)}"
onerror="this.parentElement.innerHTML='<p style=\\'color:#f48771\\'>Failed to load graph</p>'">
`;
graphsContainer.appendChild(card);
}
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
// Auto-execute file or code if provided in URL parameter
const urlParams = new URLSearchParams(window.location.search);
const autoRunFile = urlParams.get('file');
const autoRunCode = urlParams.get('code');
if (autoRunFile) {
console.log('Auto-running file from URL parameter:', autoRunFile);
// Run the file on page load
fetch('/v1/tools', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tool: 'run_file',
parameters: { file_path: autoRunFile }
})
})
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
addOutputCell('Running file: ' + autoRunFile, data.result);
updateGraphs(data.result);
} else {
addError(data.message || 'Failed to run file');
}
})
.catch(error => {
addError('Error running file: ' + error.message);
});
} else if (autoRunCode) {
console.log('Auto-running code from URL parameter');
// Run the selected code on page load
fetch('/v1/tools', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tool: 'run_selection',
parameters: { selection: autoRunCode }
})
})
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
addOutputCell('Running selection', data.result);
updateGraphs(data.result);
} else {
addError(data.message || 'Failed to run code');
}
})
.catch(error => {
addError('Error running code: ' + error.message);
});
}
commandInput.focus();
</script>
</body>
</html>
"""
# Replace the placeholder with the actual file path (with proper escaping)
if auto_run_file:
# Escape the file path for JavaScript string
escaped_file = auto_run_file.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n')
html_content = html_content.replace('AUTO_RUN_FILE_PLACEHOLDER', escaped_file)
return Response(content=html_content, media_type="text/html")
def main():
"""Main function to set up and run the server"""
try:
# Get Stata path from arguments
parser = argparse.ArgumentParser(description='Stata MCP Server')
parser.add_argument('--stata-path', type=str, help='Path to Stata installation')
parser.add_argument('--port', type=int, default=4000, help='Port to run MCP server on')
parser.add_argument('--host', type=str, default='localhost', help='Host to bind the server to')
parser.add_argument('--log-level', type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='INFO', help='Logging level')
parser.add_argument('--force-port', action='store_true', help='Force the specified port, even if it requires killing processes')
parser.add_argument('--log-file', type=str, help='Path to log file (default: stata_mcp_server.log in current directory)')
parser.add_argument('--stata-edition', type=str, choices=['mp', 'se', 'be'], default='mp',
help='Stata edition to use (mp, se, be) - default: mp')
parser.add_argument('--log-file-location', type=str, choices=['extension', 'workspace', 'custom'], default='extension',
help='Location for .do file logs (extension, workspace, custom) - default: extension')
parser.add_argument('--custom-log-directory', type=str, default='',
help='Custom directory for .do file logs (when location is custom)')
# Special handling when running as a module
if is_running_as_module:
print(f"Command line arguments when running as module: {sys.argv}")
# When run as a module, the first arg won't be the script path
args_to_parse = sys.argv[1:]
else:
# Regular mode - arg 0 is script path
#print(f"[MCP Server] Original command line arguments: {sys.argv}")
args_to_parse = sys.argv
# Skip if an argument is a duplicate script path (e.g., on Windows with shell:true)
clean_args = []
script_path_found = False
for arg in args_to_parse:
# Skip duplicate script paths, but keep the first one (sys.argv[0])
if arg.endswith('stata_mcp_server.py'):
if script_path_found and arg != sys.argv[0]:
logging.debug(f"Skipping duplicate script path: {arg}")
continue
script_path_found = True
clean_args.append(arg)
args_to_parse = clean_args
# Process commands for Stata path with spaces
fixed_args = []
i = 0
while i < len(args_to_parse):
arg = args_to_parse[i]
if arg == '--stata-path' and i + 1 < len(args_to_parse):
# The next argument might be a path that got split
stata_path = args_to_parse[i + 1]
# Check if this is a quoted path
if (stata_path.startswith('"') and not stata_path.endswith('"')) or (stata_path.startswith("'") and not stata_path.endswith("'")):
# Look for the rest of the path in subsequent arguments
i += 2 # Move past '--stata-path' and the first part
# Get the quote character (single or double)
quote_char = stata_path[0]
path_parts = [stata_path[1:]] # Remove the starting quote
# Collect all parts until we find the end quote
while i < len(args_to_parse):
current = args_to_parse[i]
if current.endswith(quote_char):
# Found the end quote
path_parts.append(current[:-1]) # Remove the ending quote
break
else:
path_parts.append(current)
i += 1
# Join all parts to form the complete path
complete_path = " ".join(path_parts)
fixed_args.append('--stata-path')
fixed_args.append(complete_path)
else:
# Normal path handling (either without quotes or with properly matched quotes)
fixed_args.append(arg)
fixed_args.append(stata_path)
i += 2
else:
# For all other arguments, add them as-is
fixed_args.append(arg)
i += 1
# Print debug info
print(f"Command line arguments: {fixed_args}")
# Use the fixed arguments
args = parser.parse_args(fixed_args[1:] if fixed_args and not is_running_as_module else fixed_args)
print(f"Parsed arguments: stata_path={args.stata_path}, port={args.port}")
# Check if args.stata_path accidentally captured other arguments
if args.stata_path and ' --' in args.stata_path:
# The stata_path might have captured other arguments
parts = args.stata_path.split(' --')
# The first part is the actual stata_path
stata_path = parts[0].strip()
print(f"WARNING: Detected merged arguments in Stata path. Fixing: {args.stata_path} -> {stata_path}")
logging.warning(f"Fixed merged arguments in Stata path: {args.stata_path} -> {stata_path}")
args.stata_path = stata_path
# If Stata path was enclosed in quotes, remove them
if args.stata_path:
args.stata_path = args.stata_path.strip('"\'')
logging.debug(f"Cleaned Stata path: {args.stata_path}")
# Configure log file
log_file = args.log_file or 'stata_mcp_server.log'
log_dir = os.path.dirname(log_file)
# Create log directory if needed
if log_dir and not os.path.exists(log_dir):
try:
os.makedirs(log_dir, exist_ok=True)
print(f"Created log directory: {log_dir}")
except Exception as e:
print(f"ERROR: Failed to create log directory {log_dir}: {str(e)}")
# Continue anyway, the file handler creation will fail if needed
# Always print where we're trying to log
print(f"Logging to: {os.path.abspath(log_file)}")
# Remove existing handlers
for handler in logging.getLogger().handlers[:]:
logging.getLogger().removeHandler(handler)
# Add file handler
try:
file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(file_handler)
print(f"Successfully configured log file: {os.path.abspath(log_file)}")
except Exception as log_error:
print(f"ERROR: Failed to configure log file {log_file}: {str(log_error)}")
# Continue with console logging only
# Re-add console handler
logging.getLogger().addHandler(console_handler)
# Set log level
log_level = getattr(logging, args.log_level)
logging.getLogger().setLevel(log_level)
# Set Stata edition
global stata_edition, log_file_location, custom_log_directory, extension_path
stata_edition = args.stata_edition.lower()
log_file_location = args.log_file_location
custom_log_directory = args.custom_log_directory
# Try to determine extension path from the log file path
if args.log_file:
# If log file is in a logs subdirectory, the parent of that is the extension path
log_file_dir = os.path.dirname(os.path.abspath(args.log_file))
if log_file_dir.endswith('logs'):
extension_path = os.path.dirname(log_file_dir)
else:
extension_path = log_file_dir
logging.info(f"Using Stata {stata_edition.upper()} edition")
logging.info(f"Log file location setting: {log_file_location}")
if custom_log_directory:
logging.info(f"Custom log directory: {custom_log_directory}")
if extension_path:
logging.info(f"Extension path: {extension_path}")
# Log startup information
logging.info(f"Log initialized at {os.path.abspath(log_file)}")
logging.info(f"Log level set to {args.log_level}")
logging.info(f"Platform: {platform.system()} {platform.release()}")
logging.info(f"Python version: {sys.version}")
logging.info(f"Working directory: {os.getcwd()}")
# Set Stata path
global STATA_PATH
if args.stata_path:
# Strip quotes if present
STATA_PATH = args.stata_path.strip('"\'')
else:
STATA_PATH = os.environ.get('STATA_PATH')
if not STATA_PATH:
if platform.system() == 'Darwin': # macOS
STATA_PATH = '/Applications/Stata'
elif platform.system() == 'Windows':
# Try common Windows paths
potential_paths = [
'C:\\Program Files\\Stata18',
'C:\\Program Files\\Stata17',
'C:\\Program Files\\Stata16',
'C:\\Program Files (x86)\\Stata18',
'C:\\Program Files (x86)\\Stata17',
'C:\\Program Files (x86)\\Stata16'
]
for path in potential_paths:
if os.path.exists(path):
STATA_PATH = path
break
if not STATA_PATH:
STATA_PATH = 'C:\\Program Files\\Stata18' # Default if none found
else: # Linux
STATA_PATH = '/usr/local/stata'
logging.info(f"Using Stata path: {STATA_PATH}")
if not os.path.exists(STATA_PATH):
logging.error(f"Stata path does not exist: {STATA_PATH}")
print(f"ERROR: Stata path does not exist: {STATA_PATH}")
sys.exit(1)
# Check if the requested port is available
port = args.port
if args.force_port:
# Kill any existing process on the port
kill_process_on_port(port)
else:
# Always kill processes on port 4000
if port == 4000:
logging.info(f"Ensuring port 4000 is available by terminating any existing processes")
kill_process_on_port(port)
else:
# For other ports, check if available
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(('localhost', port))
if result == 0: # Port is in use
logging.warning(f"Port {port} is already in use")
# Kill the process on the port instead of finding a new one
logging.info(f"Attempting to kill process using port {port}")
kill_process_on_port(port)
# Try to initialize Stata
try_init_stata(STATA_PATH)
# Create and mount the MCP server
# Only expose run_selection and run_file to LLMs
# Other endpoints are still accessible via direct HTTP calls from VS Code extension
# Configure HTTP client with ASGI transport and extended timeout for long-running Stata operations
http_client = httpx.AsyncClient(
transport=httpx.ASGITransport(app=app, raise_app_exceptions=False),
base_url="http://apiserver",
timeout=1200.0 # 20 minutes timeout for long Stata operations
)
mcp = FastApiMCP(
app,
name=SERVER_NAME,
description="This server provides tools for running Stata commands and scripts. Use stata_run_selection for running code snippets and stata_run_file for executing .do files.",
http_client=http_client,
exclude_operations=[
"call_tool_v1_tools_post", # Legacy VS Code extension endpoint
"health_check_health_get", # Health check endpoint
"view_data_endpoint_view_data_get", # Data viewer endpoint (VS Code only)
"get_graph_graphs_graph_name_get", # Graph serving endpoint (VS Code only)
"clear_history_endpoint_clear_history_post", # History clearing (VS Code only)
"interactive_window_interactive_get", # Interactive window (VS Code only)
"stata_run_file_stream_endpoint_run_file_stream_get" # SSE streaming endpoint (HTTP clients only)
]
)
# Mount SSE transport at /mcp for backward compatibility
mcp.mount(mount_path="/mcp", transport="sse")
# ========================================================================
# HTTP (Streamable) Transport - Separate Server Instance
# ========================================================================
# Create a SEPARATE MCP server instance for HTTP to avoid session conflicts
# This ensures notifications go to the correct transport
from mcp.server import Server as MCPServer
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.responses import StreamingResponse as StarletteStreamingResponse
logging.info("Creating separate MCP server instance for HTTP transport...")
http_mcp_server = MCPServer(SERVER_NAME)
# Register list_tools handler to expose the same tools
@http_mcp_server.list_tools()
async def list_tools_http():
"""List available tools - delegate to main server"""
# Get tools from the main fastapi_mcp server
import mcp.types as types
tools_list = []
# stata_run_selection tool
tools_list.append(types.Tool(
name="stata_run_selection",
description="Stata Run Selection Endpoint\n\nRun selected Stata code and return the output\n\n### Responses:\n\n**200**: Successful Response (Success Response)",
inputSchema={
"type": "object",
"properties": {
"selection": {"type": "string", "title": "selection"}
},
"title": "stata_run_selectionArguments",
"required": ["selection"]
}
))
# stata_run_file tool
tools_list.append(types.Tool(
name="stata_run_file",
description="Stata Run File Endpoint\n\nRun a Stata .do file and return the output (MCP-compatible endpoint)\n\nArgs:\n file_path: Path to the .do file\n timeout: Timeout in seconds (default: 600 seconds / 10 minutes)\n\nReturns:\n Response with plain text output\n\n### Responses:\n\n**200**: Successful Response (Success Response)",
inputSchema={
"type": "object",
"properties": {
"file_path": {"type": "string", "title": "file_path"},
"timeout": {"type": "integer", "default": 600, "title": "timeout"}
},
"title": "stata_run_fileArguments",
"required": ["file_path"]
}
))
return tools_list
# Register call_tool handler to execute tools with HTTP server's context
@http_mcp_server.call_tool()
async def call_tool_http(name: str, arguments: dict) -> list:
"""Execute tools using HTTP server's own context for proper notification routing"""
import mcp.types as types
logging.debug(f"HTTP server executing tool: {name}")
# Call the fastapi_mcp's execute method, which has the streaming wrapper
# The streaming wrapper will check http_mcp_server.request_context (which is set by StreamableHTTPSessionManager)
result = await mcp._execute_api_tool(
client=http_client,
tool_name=name,
arguments=arguments,
operation_map=mcp.operation_map, # Correct attribute name
http_request_info=None
)
return result
logging.debug("Registered tool handlers with HTTP server")
# Create HTTP session manager with dedicated server
http_session_manager = StreamableHTTPSessionManager(
app=http_mcp_server, # Use dedicated HTTP server, not shared
event_store=None,
json_response=False, # Use SSE format for responses
stateless=False, # Maintain session state
)
logging.info("HTTP transport configured with dedicated MCP server")
# Create a custom Response class that properly handles ASGI streaming
class ASGIPassthroughResponse(StarletteStreamingResponse):
"""Response that passes through ASGI calls without buffering"""
def __init__(self, asgi_handler, scope, receive):
# Initialize the parent class with a dummy streaming function
# We need this to set up all required attributes like background, headers, etc.
super().__init__(content=iter([]), media_type="text/event-stream")
# Store our ASGI handler
self.asgi_handler = asgi_handler
self.scope_data = scope
self.receive_func = receive
async def __call__(self, scope, receive, send):
"""Handle ASGI request/response cycle"""
# Call the ASGI handler directly with the provided send callback
# This allows SSE events to be sent immediately without buffering
await self.asgi_handler(self.scope_data, self.receive_func, send)
@app.api_route(
"/mcp-streamable",
methods=["GET", "POST", "DELETE"],
include_in_schema=False,
operation_id="mcp_http_streamable"
)
async def handle_mcp_streamable(request: Request):
"""Handle MCP Streamable HTTP requests with proper ASGI passthrough"""
# Return a response that directly passes through to the ASGI handler
# This avoids any buffering by FastAPI/Starlette
return ASGIPassthroughResponse(
asgi_handler=http_session_manager.handle_request,
scope=request.scope,
receive=request.receive
)
# Store the session manager for startup/shutdown
app.state.http_session_manager = http_session_manager
app.state.http_session_manager_cm = None
# Define startup handler for the HTTP session manager
async def _start_http_session_manager():
"""Start the HTTP session manager task group"""
try:
logging.info("Starting StreamableHTTP session manager...")
# Enter the context manager
app.state.http_session_manager_cm = http_session_manager.run()
await app.state.http_session_manager_cm.__aenter__()
logging.info("✓ StreamableHTTP session manager started successfully")
except Exception as e:
logging.error(f"Failed to start StreamableHTTP session manager: {e}", exc_info=True)
raise
# Define shutdown handler for the HTTP session manager
async def _stop_http_session_manager():
"""Stop the HTTP session manager"""
if app.state.http_session_manager_cm:
try:
logging.info("Stopping StreamableHTTP session manager...")
await app.state.http_session_manager_cm.__aexit__(None, None, None)
logging.info("✓ StreamableHTTP session manager stopped")
except Exception as e:
logging.error(f"Error stopping HTTP session manager: {e}", exc_info=True)
# Store handlers on app.state for the lifespan manager to call
app.state._http_session_manager_starter = _start_http_session_manager
app.state._http_session_manager_stopper = _stop_http_session_manager
logging.debug("HTTP session manager startup/shutdown handlers registered with lifespan")
# Store reference
mcp._http_transport = http_session_manager
logging.info("MCP HTTP Streamable transport mounted at /mcp-streamable with TRUE SSE streaming (ASGI direct)")
LOG_LEVEL_RANK = {
"debug": 0,
"info": 1,
"notice": 2,
"warning": 3,
"error": 4,
"critical": 5,
"alert": 6,
"emergency": 7,
}
DEFAULT_LOG_LEVEL = "notice"
@mcp.server.set_logging_level()
async def handle_set_logging_level(level: str):
"""Persist client-requested log level for the current session."""
try:
ctx = mcp.server.request_context
except LookupError:
logging.debug("logging/setLevel received outside of request context")
return
session = getattr(ctx, "session", None)
if session is not None:
setattr(session, "_stata_log_level", (level or "info").lower())
logging.debug(f"Set MCP log level for session to {level}")
# Enhance stata_run_file with MCP-native streaming updates
original_execute = mcp._execute_api_tool
async def execute_with_streaming(*call_args, **call_kwargs):
"""Wrap tool execution to stream progress for long-running Stata jobs."""
if not call_args:
raise TypeError("execute_with_streaming requires bound 'self'")
bound_self = call_args[0]
original_args = call_args[1:]
original_kwargs = dict(call_kwargs)
# Extract known keyword arguments
working_kwargs = dict(call_kwargs)
client = working_kwargs.pop("client", None)
tool_name = working_kwargs.pop("tool_name", None)
arguments = working_kwargs.pop("arguments", None)
operation_map = working_kwargs.pop("operation_map", None)
http_request_info = working_kwargs.pop("http_request_info", None)
# Log and discard unexpected kwargs to stay forwards-compatible
for extra_key in list(working_kwargs.keys()):
extra_val = working_kwargs.pop(extra_key, None)
logging.debug(f"Ignoring unexpected MCP execute kwarg: {extra_key}={extra_val!r}")
remaining = list(original_args)
# Fill from positional args if any are missing
if client is None and remaining:
client = remaining.pop(0)
if tool_name is None and remaining:
tool_name = remaining.pop(0)
if arguments is None and remaining:
arguments = remaining.pop(0)
if operation_map is None and remaining:
operation_map = remaining.pop(0)
if http_request_info is None and remaining:
http_request_info = remaining.pop(0)
# If not our tool or required data missing, fall back to original implementation
if (
tool_name != "stata_run_file"
or client is None
or operation_map is None
):
return await original_execute(*original_args, **original_kwargs)
arguments_dict = dict(arguments or {})
# Try to get request context from either HTTP or SSE server
# IMPORTANT: Check HTTP first! If we check SSE first, we might get stale SSE context
# even when the request came through HTTP.
ctx = None
server_type = "unknown"
try:
ctx = http_mcp_server.request_context
server_type = "HTTP"
logging.debug(f"Using HTTP server request context: {ctx}")
except (LookupError, NameError):
# HTTP server has no context, try SSE server
try:
ctx = bound_self.server.request_context
server_type = "SSE"
logging.debug(f"Using SSE server request context: {ctx}")
except LookupError:
logging.debug("No MCP request context available; skipping streaming wrapper")
return await original_execute(
client=client,
tool_name=tool_name,
arguments=arguments_dict,
operation_map=operation_map,
http_request_info=http_request_info,
)
session = getattr(ctx, "session", None)
request_id = getattr(ctx, "request_id", None)
progress_token = getattr(getattr(ctx, "meta", None), "progressToken", None)
# DEBUG: Log session information
logging.info(f"✓ Streaming enabled via {server_type} server - Tool: {tool_name}")
if session:
session_attrs = [attr for attr in dir(session) if not attr.startswith('__')]
logging.debug(f"Session type: {type(session)}, Attributes: {session_attrs[:10]}")
session_id = getattr(session, "_session_id", getattr(session, "session_id", getattr(session, "id", None)))
else:
session_id = None
logging.debug(f"Tool execution - Server: {server_type}, Session ID: {session_id}, Request ID: {request_id}, Progress Token: {progress_token}")
if session is None:
logging.debug("MCP session not available; falling back to default execution")
return await original_execute(
client=client,
tool_name=tool_name,
arguments=arguments_dict,
operation_map=operation_map,
http_request_info=http_request_info,
)
if not hasattr(session, "_stata_log_level"):
setattr(session, "_stata_log_level", DEFAULT_LOG_LEVEL)
file_path = arguments_dict.get("file_path", "")
try:
timeout = int(arguments_dict.get("timeout", 600))
except (TypeError, ValueError):
timeout = 600
resolved_path, resolution_candidates = resolve_do_file_path(file_path)
effective_path = resolved_path or os.path.abspath(file_path)
base_name = os.path.splitext(os.path.basename(effective_path))[0]
log_file_path = get_log_file_path(effective_path, base_name)
logging.info(f"📡 MCP streaming enabled for {os.path.basename(file_path)}")
logging.debug(f"MCP log streaming monitoring: {log_file_path}")
if not resolved_path:
logging.debug(f"Resolution attempts: {resolution_candidates}")
import asyncio as _asyncio
import time as _time
async def send_log(level: str, message: str):
level = (level or "info").lower()
session_level = getattr(session, "_stata_log_level", DEFAULT_LOG_LEVEL)
if LOG_LEVEL_RANK.get(level, 0) < LOG_LEVEL_RANK.get(session_level, LOG_LEVEL_RANK[DEFAULT_LOG_LEVEL]):
return
logging.debug(f"MCP streaming log [{level}] (session level {session_level}): {message}")
try:
await session.send_log_message(
level=level,
data=message,
logger="stata-mcp",
related_request_id=request_id,
)
except Exception as send_exc: # noqa: BLE001
logging.debug(f"Unable to send MCP log message: {send_exc}")
async def send_progress(elapsed: float, message: str | None = None):
if progress_token is None:
return
try:
await session.send_progress_notification(
progress_token=progress_token,
progress=elapsed,
total=timeout,
message=message,
related_request_id=request_id,
)
except Exception as send_exc: # noqa: BLE001
logging.debug(f"Unable to send MCP progress notification: {send_exc}")
task = _asyncio.create_task(
original_execute(
client=client,
tool_name=tool_name,
arguments=arguments_dict,
operation_map=operation_map,
http_request_info=http_request_info,
)
)
start_time = _time.time()
stream_interval = 5
poll_interval = 2
last_stream = 0.0
last_offset = 0
start_message = f"▶️ Starting Stata execution: {os.path.basename(effective_path)}"
await send_log("notice", start_message)
await send_progress(0.0, start_message)
try:
while not task.done():
await _asyncio.sleep(poll_interval)
now = _time.time()
elapsed = now - start_time
if now - last_stream >= stream_interval:
progress_msg = f"⏱️ {elapsed:.0f}s elapsed / {timeout}s timeout"
await send_progress(elapsed, progress_msg)
if os.path.exists(log_file_path):
await send_log(
"notice",
f"{progress_msg}\n\n(📁 Inspecting Stata log for new output...)",
)
try:
with open(log_file_path, "r", encoding="utf-8", errors="replace") as log_file:
log_file.seek(last_offset)
new_content = log_file.read()
last_offset = log_file.tell()
snippet = ""
if new_content.strip():
lines = new_content.strip().splitlines()
snippet = "\n".join(lines[-3:])
if snippet:
progress_msg = f"{progress_msg}\n\n📝 Recent output:\n{snippet}"
await send_log("notice", progress_msg)
except Exception as read_exc: # noqa: BLE001
logging.debug(f"Error reading log for streaming: {read_exc}")
await send_log(
"notice",
f"{progress_msg} (waiting for output...)",
)
else:
await send_log(
"notice",
f"{progress_msg} (initializing...)",
)
last_stream = now
result = await task
total_time = _time.time() - start_time
await send_log("notice", f"✅ Execution completed in {total_time:.1f}s")
return result
except Exception as exc:
logging.error(f"❌ Error during MCP streaming: {exc}", exc_info=True)
await send_log("error", f"Error during execution: {exc}")
raise
import types as _types
mcp._execute_api_tool = _types.MethodType(execute_with_streaming, mcp)
logging.info("📡 MCP streaming wrapper installed for stata_run_file")
# Mark MCP as initialized (will also be set in startup event)
global mcp_initialized
mcp_initialized = True
logging.info("MCP server mounted and initialized")
try:
# Start the server
logging.info(f"Starting Stata MCP Server on {args.host}:{port}")
logging.info(f"Stata available: {stata_available}")
# Print to stdout as well to ensure visibility
if platform.system() == 'Windows':
# For Windows, completely skip the startup message if another instance is detected
# as we already printed information above
if not stata_banner_displayed:
print(f"INITIALIZATION SUCCESS: Stata MCP Server starting on {args.host}:{port}")
print(f"Stata available: {stata_available}")
print(f"Log file: {os.path.abspath(log_file)}")
else:
# Normal behavior for macOS/Linux
print(f"INITIALIZATION SUCCESS: Stata MCP Server starting on {args.host}:{port}")
print(f"Stata available: {stata_available}")
print(f"Log file: {os.path.abspath(log_file)}")
import uvicorn
uvicorn.run(
app,
host=args.host,
port=port,
log_level="warning", # Use warning to allow important messages through
access_log=False # Disable access logs
)
except Exception as e:
logging.error(f"Server error: {str(e)}")
traceback.print_exc()
sys.exit(1)
except Exception as e:
logging.error(f"Error in main function: {str(e)}")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
```