# Directory Structure ``` ├── .gitignore ├── CHANGELOG.md ├── docker-compose.yml ├── Dockerfile ├── etc │ ├── catalog │ │ ├── bullshit.properties │ │ └── memory.properties │ ├── config.properties │ ├── jvm.config │ └── node.properties ├── examples │ └── simple_mcp_query.py ├── LICENSE ├── llm_query_trino.py ├── llm_trino_api.py ├── load_bullshit_data.py ├── openapi.json ├── pyproject.toml ├── pytest.ini ├── README.md ├── requirements-dev.txt ├── run_tests.sh ├── scripts │ ├── docker_stdio_test.py │ ├── fix_trino_session.py │ ├── test_direct_query.py │ ├── test_fixed_client.py │ ├── test_messages.py │ ├── test_quick_query.py │ └── test_stdio_trino.py ├── src │ └── trino_mcp │ ├── __init__.py │ ├── config.py │ ├── resources │ │ └── __init__.py │ ├── server.py │ ├── tools │ │ └── __init__.py │ └── trino_client.py ├── test_bullshit_query.py ├── test_llm_api.py ├── test_mcp_stdio.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── integration │ │ └── __init__.py │ └── test_client.py ├── tools │ ├── create_bullshit_data.py │ ├── run_queries.sh │ ├── setup │ │ ├── setup_data.sh │ │ └── setup_tables.sql │ └── setup_bullshit_table.py └── trino-conf ├── catalog │ └── memory.properties ├── config.properties ├── jvm.config └── node.properties ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg # Virtual environments venv/ env/ ENV/ # Temp temp/ # IDE files .idea/ .vscode/ .cursor/ *.swp *.swo # Logs and databases *.log *.sqlite3 logs/ # Project-specific data/ archive/ # OS specific files .DS_Store Thumbs.db ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Trino MCP Server Model Context Protocol server for Trino, providing AI models with structured access to Trino's distributed SQL query engine. ⚠️ **BETA RELEASE (v0.1.2)** ⚠️ This project is stabilizing with core features working and tested. Feel free to fork and contribute! ## Features - ✅ Fixed Docker container API initialization issue! (reliable server initalization) - ✅ Exposes Trino resources through MCP protocol - ✅ Enables AI tools to query and analyze data in Trino - ✅ Provides transport options (STDIO transport works reliably; SSE transport has issues) - ✅ Fixed catalog handling for proper Trino query execution - ✅ Both Docker container API and standalone Python API server options ## Quick Start ```bash # Start the server with docker-compose docker-compose up -d # Verify the API is working curl -X POST "http://localhost:9097/api/query" \ -H "Content-Type: application/json" \ -d '{"query": "SELECT 1 AS test"}' ``` Need a non-containerized version? Run the standalone API: ```bash # Run the standalone API server on port 8008 python llm_trino_api.py ``` ## LLM Integration Want to give an LLM direct access to query your Trino instance? We've created simple tools for that! ### Command-Line LLM Interface The simplest way to let an LLM query Trino is through our command-line tool: ```bash # Simple direct query (perfect for LLMs) python llm_query_trino.py "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5" # Specify a different catalog or schema python llm_query_trino.py "SELECT * FROM information_schema.tables" memory information_schema ``` ### REST API for LLMs We offer two API options for integration with LLM applications: #### 1. Docker Container API (Port 9097) The Docker container exposes a REST API on port 9097: ```bash # Execute a query against the Docker container API curl -X POST "http://localhost:9097/api/query" \ -H "Content-Type: application/json" \ -d '{"query": "SELECT 1 AS test"}' ``` #### 2. Standalone Python API (Port 8008) For more flexible deployments, run the standalone API server: ```bash # Start the API server on port 8008 python llm_trino_api.py ``` This creates endpoints at: - `GET http://localhost:8008/` - API usage info - `POST http://localhost:8008/query` - Execute SQL queries You can then have your LLM make HTTP requests to this endpoint: ```python # Example code an LLM might generate import requests def query_trino(sql_query): response = requests.post( "http://localhost:8008/query", json={"query": sql_query} ) return response.json() # LLM-generated query results = query_trino("SELECT job_title, AVG(salary) FROM memory.bullshit.real_bullshit_data GROUP BY job_title ORDER BY AVG(salary) DESC LIMIT 5") print(results["formatted_results"]) ``` This approach allows LLMs to focus on generating SQL, while our tools handle all the MCP protocol complexity! ## Demo and Validation Scripts 🚀 We've created some badass demo scripts that show how AI models can use the MCP protocol to run complex queries against Trino: ### 1. Bullshit Data Generation and Loading The `tools/create_bullshit_data.py` script generates a dataset of 10,000 employees with ridiculous job titles, inflated salaries, and a "bullshit factor" rating (1-10): ```bash # Generate the bullshit data python tools/create_bullshit_data.py # Load the bullshit data into Trino's memory catalog python load_bullshit_data.py ``` ### 2. Running Complex Queries through MCP The `test_bullshit_query.py` script demonstrates end-to-end MCP interaction: - Connects to the MCP server using STDIO transport - Initializes the protocol following the MCP spec - Runs a complex SQL query with WHERE, GROUP BY, HAVING, ORDER BY - Processes and formats the results ```bash # Run a complex query against the bullshit data through MCP python test_bullshit_query.py ``` Example output showing top BS jobs with high salaries: ``` 🏆 TOP 10 BULLSHIT JOBS (high salary, high BS factor): ---------------------------------------------------------------------------------------------------- JOB_TITLE | COUNT | AVG_SALARY | MAX_SALARY | AVG_BS_FACTOR ---------------------------------------------------------------------------------------------------- Advanced Innovation Jedi | 2 | 241178.50 | 243458.00 | 7.50 VP of Digital Officer | 1 | 235384.00 | 235384.00 | 7.00 Innovation Technical Architect | 1 | 235210.00 | 235210.00 | 9.00 ...and more! ``` ### 3. API Testing The `test_llm_api.py` script validates the API functionality: ```bash # Test the Docker container API python test_llm_api.py ``` This performs a comprehensive check of: - API endpoint discovery - Documentation availability - Valid query execution - Error handling for invalid queries ## Usage ```bash # Start the server with docker-compose docker-compose up -d ``` The server will be available at: - Trino: http://localhost:9095 - MCP server: http://localhost:9096 - API server: http://localhost:9097 ## Client Connection ✅ **IMPORTANT**: The client scripts run on your local machine (OUTSIDE Docker) and connect TO the Docker containers. The scripts automatically handle this by using docker exec commands. You don't need to be inside the container to use MCP! Running tests from your local machine: ```bash # Generate and load data into Trino python tools/create_bullshit_data.py # Generates data locally python load_bullshit_data.py # Loads data to Trino in Docker # Run MCP query through Docker python test_bullshit_query.py # Queries using MCP in Docker ``` ## Transport Options This server supports two transport methods, but only STDIO is currently reliable: ### STDIO Transport (Recommended and Working) STDIO transport works reliably and is currently the only recommended method for testing and development: ```bash # Run with STDIO transport inside the container docker exec -i trino_mcp_trino-mcp_1 python -m trino_mcp.server --transport stdio --debug --trino-host trino --trino-port 8080 --trino-user trino --trino-catalog memory ``` ### SSE Transport (NOT RECOMMENDED - Has Critical Issues) SSE is the default transport in MCP but has serious issues with the current MCP 1.3.0 version, causing server crashes on client disconnections. **Not recommended for use until these issues are resolved**: ```bash # NOT RECOMMENDED: Run with SSE transport (crashes on disconnection) docker exec trino_mcp_trino-mcp_1 python -m trino_mcp.server --transport sse --host 0.0.0.0 --port 8000 --debug ``` ## Known Issues and Fixes ### Fixed: Docker Container API Initialization ✅ **FIXED**: We've resolved an issue where the API in the Docker container returned 503 Service Unavailable responses. The problem was with the `app_lifespan` function not properly initializing the `app_context_global` and Trino client connection. The fix ensures that: 1. The Trino client explicitly connects during startup 2. The AppContext global variable is properly initialized 3. Health checks now work correctly If you encounter 503 errors, check that your container has been rebuilt with the latest code: ```bash # Rebuild and restart the container with the fix docker-compose stop trino-mcp docker-compose rm -f trino-mcp docker-compose up -d trino-mcp ``` ### MCP 1.3.0 SSE Transport Crashes There's a critical issue with MCP 1.3.0's SSE transport that causes server crashes when clients disconnect. Until a newer MCP version is integrated, use STDIO transport exclusively. The error manifests as: ``` RuntimeError: generator didn't stop after athrow() anyio.BrokenResourceError ``` ### Trino Catalog Handling We fixed an issue with catalog handling in the Trino client. The original implementation attempted to use `USE catalog` statements, which don't work reliably. The fix directly sets the catalog in the connection parameters. ## Project Structure This project is organized as follows: - `src/` - Main source code for the Trino MCP server - `examples/` - Simple examples showing how to use the server - `scripts/` - Useful diagnostic and testing scripts - `tools/` - Utility scripts for data creation and setup - `tests/` - Automated tests Key files: - `llm_trino_api.py` - Standalone API server for LLM integration - `test_llm_api.py` - Test script for the API server - `test_mcp_stdio.py` - Main test script using STDIO transport (recommended) - `test_bullshit_query.py` - Complex query example with bullshit data - `load_bullshit_data.py` - Script to load generated data into Trino - `tools/create_bullshit_data.py` - Script to generate hilarious test data - `run_tests.sh` - Script to run automated tests - `examples/simple_mcp_query.py` - Simple example to query data using MCP ## Development **IMPORTANT**: All scripts can be run from your local machine - they'll automatically communicate with the Docker containers via docker exec commands! ```bash # Install development dependencies pip install -e ".[dev]" # Run automated tests ./run_tests.sh # Test MCP with STDIO transport (recommended) python test_mcp_stdio.py # Simple example query python examples/simple_mcp_query.py "SELECT 'Hello World' AS message" ``` ## Testing To test that Trino queries are working correctly, use the STDIO transport test script: ```bash # Recommended test method (STDIO transport) python test_mcp_stdio.py ``` For more complex testing with the bullshit data: ```bash # Load and query the bullshit data (shows the full power of Trino MCP!) python load_bullshit_data.py python test_bullshit_query.py ``` For testing the LLM API endpoint: ```bash # Test the Docker container API python test_llm_api.py # Test the standalone API (make sure it's running first) python llm_trino_api.py curl -X POST "http://localhost:8008/query" \ -H "Content-Type: application/json" \ -d '{"query": "SELECT 1 AS test"}' ``` ## How LLMs Can Use This LLMs can use the Trino MCP server to: 1. **Get Database Schema Information**: ```python # Example prompt to LLM: "What schemas are available in the memory catalog?" # LLM can generate code to query: query = "SHOW SCHEMAS FROM memory" ``` 2. **Run Complex Analytical Queries**: ```python # Example prompt: "Find the top 5 job titles with highest average salaries" # LLM can generate complex SQL: query = """ SELECT job_title, AVG(salary) as avg_salary FROM memory.bullshit.real_bullshit_data GROUP BY job_title ORDER BY avg_salary DESC LIMIT 5 """ ``` 3. **Perform Data Analysis and Present Results**: ```python # LLM can parse the response, extract insights and present to user: "The highest paying job title is 'Advanced Innovation Jedi' with an average salary of $241,178.50" ``` ### Real LLM Analysis Example: Bullshit Jobs by Company Here's a real example of what an LLM could produce when asked to "Identify the companies with the most employees in bullshit jobs and create a Mermaid chart": #### Step 1: LLM generates and runs the query ```sql SELECT company, COUNT(*) as employee_count, AVG(bullshit_factor) as avg_bs_factor FROM memory.bullshit.real_bullshit_data WHERE bullshit_factor > 7 GROUP BY company ORDER BY employee_count DESC, avg_bs_factor DESC LIMIT 10 ``` #### Step 2: LLM gets and analyzes the results ``` COMPANY | EMPLOYEE_COUNT | AVG_BS_FACTOR ---------------------------------------- Unknown Co | 2 | 9.0 BitEdge | 1 | 10.0 CyberWare | 1 | 10.0 BitLink | 1 | 10.0 AlgoMatrix | 1 | 10.0 CryptoHub | 1 | 10.0 BitGrid | 1 | 10.0 MLStream | 1 | 10.0 CloudCube | 1 | 10.0 UltraEdge | 1 | 10.0 ``` #### Step 3: LLM generates a Mermaid chart visualization ```mermaid %%{init: {'theme': 'forest'}}%% graph LR title[Companies with Most Bullshit Jobs] style title fill:#333,stroke:#333,stroke-width:1px,color:white,font-weight:bold,font-size:18px Companies --> UnknownCo[Unknown Co] Companies --> BitEdge[BitEdge] Companies --> CyberWare[CyberWare] Companies --> BitLink[BitLink] Companies --> AlgoMatrix[AlgoMatrix] Companies --> CryptoHub[CryptoHub] Companies --> BitGrid[BitGrid] Companies --> MLStream[MLStream] Companies --> CloudCube[CloudCube] Companies --> UltraEdge[UltraEdge] UnknownCo --- Count2[2 employees] BitEdge --- Count1a[1 employee] CyberWare --- Count1b[1 employee] BitLink --- Count1c[1 employee] AlgoMatrix --- Count1d[1 employee] CryptoHub --- Count1e[1 employee] BitGrid --- Count1f[1 employee] MLStream --- Count1g[1 employee] CloudCube --- Count1h[1 employee] UltraEdge --- Count1i[1 employee] classDef company fill:#ff5733,stroke:#333,stroke-width:1px,color:white,font-weight:bold; classDef count fill:#006100,stroke:#333,stroke-width:1px,color:white,font-weight:bold; class UnknownCo,BitEdge,CyberWare,BitLink,AlgoMatrix,CryptoHub,BitGrid,MLStream,CloudCube,UltraEdge company; class Count2,Count1a,Count1b,Count1c,Count1d,Count1e,Count1f,Count1g,Count1h,Count1i count; ``` **Alternative Bar Chart:** ```mermaid %%{init: {'theme': 'default'}}%% pie showData title Companies with Bullshit Jobs "Unknown Co (BS: 9.0)" : 2 "BitEdge (BS: 10.0)" : 1 "CyberWare (BS: 10.0)" : 1 "BitLink (BS: 10.0)" : 1 "AlgoMatrix (BS: 10.0)" : 1 "CryptoHub (BS: 10.0)" : 1 "BitGrid (BS: 10.0)" : 1 "MLStream (BS: 10.0)" : 1 "CloudCube (BS: 10.0)" : 1 "UltraEdge (BS: 10.0)" : 1 ``` #### Step 4: LLM provides key insights The LLM can analyze the data and provide insights: - "Unknown Co" has the most employees in bullshit roles (2), while all others have just one - Most companies have achieved a perfect 10.0 bullshit factor score - Tech-focused companies (BitEdge, CyberWare, etc.) seem to create particularly meaningless roles - Bullshit roles appear concentrated at executive or specialized position levels This example demonstrates how an LLM can: 1. Generate appropriate SQL queries based on natural language questions 2. Process and interpret the results from Trino 3. Create visual representations of the data 4. Provide meaningful insights and analysis ## Accessing the API The Trino MCP server now includes two API options for accessing data: ### 1. Docker Container API (Port 9097) ```python import requests import json # API endpoint (default port 9097 in Docker setup) api_url = "http://localhost:9097/api/query" # Define your SQL query query_data = { "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5", "catalog": "memory", "schema": "bullshit" } # Send the request response = requests.post(api_url, json=query_data) results = response.json() # Process the results if results["success"]: print(f"Query returned {results['results']['row_count']} rows") for row in results['results']['rows']: print(row) else: print(f"Query failed: {results['message']}") ``` ### 2. Standalone Python API (Port 8008) ```python # Same code as above, but with different port api_url = "http://localhost:8008/query" ``` Both APIs offer the following endpoints: - `GET /api` - API documentation and usage examples - `POST /api/query` - Execute SQL queries against Trino These APIs eliminate the need for wrapper scripts and let LLMs query Trino directly using REST calls, making it much simpler to integrate with services like Claude, GPT, and other AI systems. ## Troubleshooting ### API Returns 503 Service Unavailable If the Docker container API returns 503 errors: 1. Make sure you've rebuilt the container with the latest code: ```bash docker-compose stop trino-mcp docker-compose rm -f trino-mcp docker-compose up -d trino-mcp ``` 2. Check the container logs for errors: ```bash docker logs trino_mcp_trino-mcp_1 ``` 3. Verify that Trino is running properly: ```bash curl -s http://localhost:9095/v1/info | jq ``` ### Port Conflicts with Standalone API The standalone API defaults to port 8008 to avoid conflicts. If you see an "address already in use" error: 1. Edit `llm_trino_api.py` and change the port number in the last line: ```python uvicorn.run(app, host="127.0.0.1", port=8008) ``` 2. Run with a custom port via command line: ```bash python -c "import llm_trino_api; import uvicorn; uvicorn.run(llm_trino_api.app, host='127.0.0.1', port=8009)" ``` ## Future Work This is now in beta with these improvements planned: - [ ] Integrate with newer MCP versions when available to fix SSE transport issues - [ ] Add/Validate support for Hive, JDBC, and other connectors - [ ] Add more comprehensive query validation across different types and complexities - [ ] Implement support for more data types and advanced Trino features - [ ] Improve error handling and recovery mechanisms - [ ] Add user authentication and permission controls - [ ] Create more comprehensive examples and documentation - [ ] Develop admin monitoring and management interfaces - [ ] Add performance metrics and query optimization hints - [ ] Implement support for long-running queries and result streaming --- *Developed by Stink Labs, 2025* ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python """ Test package for the Trino MCP server. """ ``` -------------------------------------------------------------------------------- /etc/catalog/memory.properties: -------------------------------------------------------------------------------- ``` connector.name=memory memory.max-data-per-node=512MB ``` -------------------------------------------------------------------------------- /trino-conf/catalog/memory.properties: -------------------------------------------------------------------------------- ``` connector.name=memory memory.max-data-per-node=512MB ``` -------------------------------------------------------------------------------- /tests/integration/__init__.py: -------------------------------------------------------------------------------- ```python """ Integration test package for the Trino MCP server. """ ``` -------------------------------------------------------------------------------- /etc/config.properties: -------------------------------------------------------------------------------- ``` coordinator=true node-scheduler.include-coordinator=true http-server.http.port=8080 discovery.uri=http://localhost:8080 ``` -------------------------------------------------------------------------------- /trino-conf/config.properties: -------------------------------------------------------------------------------- ``` coordinator=true node-scheduler.include-coordinator=true http-server.http.port=8080 discovery.uri=http://localhost:8080 ``` -------------------------------------------------------------------------------- /etc/node.properties: -------------------------------------------------------------------------------- ``` node.environment=production node.data-dir=/data/trino node.server-log-file=/var/log/trino/server.log node.launcher-log-file=/var/log/trino/launcher.log ``` -------------------------------------------------------------------------------- /trino-conf/node.properties: -------------------------------------------------------------------------------- ``` node.environment=production node.data-dir=/data/trino node.server-log-file=/var/log/trino/server.log node.launcher-log-file=/var/log/trino/launcher.log ``` -------------------------------------------------------------------------------- /etc/jvm.config: -------------------------------------------------------------------------------- ``` -server -Xmx2G -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:+ExitOnOutOfMemoryError -Djdk.attach.allowAttachSelf=true ``` -------------------------------------------------------------------------------- /trino-conf/jvm.config: -------------------------------------------------------------------------------- ``` -server -Xmx2G -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:+ExitOnOutOfMemoryError -Djdk.attach.allowAttachSelf=true ``` -------------------------------------------------------------------------------- /requirements-dev.txt: -------------------------------------------------------------------------------- ``` # Development dependencies pytest>=7.3.1 pytest-cov>=4.1.0 black>=23.0.0 isort>=5.12.0 mypy>=1.4.0 # SSE client for testing sseclient-py>=1.7.2 # HTTP client requests>=2.28.0 # Type checking types-requests>=2.28.0 ``` -------------------------------------------------------------------------------- /src/trino_mcp/__init__.py: -------------------------------------------------------------------------------- ```python """ Trino MCP server package. This package provides an MCP (Model Context Protocol) server for Trino. It enables AI systems to interact with Trino databases using the standardized MCP protocol. """ __version__ = "0.1.2" ``` -------------------------------------------------------------------------------- /etc/catalog/bullshit.properties: -------------------------------------------------------------------------------- ``` connector.name=hive hive.metastore.uri=thrift://hive-metastore:9083 hive.non-managed-table-writes-enabled=true hive.parquet.use-column-names=true hive.max-partitions-per-scan=1000000 hive.metastore-cache-ttl=60m hive.metastore-refresh-interval=5m ``` -------------------------------------------------------------------------------- /openapi.json: -------------------------------------------------------------------------------- ```json {"openapi":"3.1.0","info":{"title":"Trino MCP Health API","version":"0.1.0"},"paths":{"/health":{"get":{"summary":"Health","operationId":"health_health_get","responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{}}}}}}}}} ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` [pytest] testpaths = tests python_files = test_*.py python_classes = Test* python_functions = test_* markers = integration: marks tests as integration tests (deselect with '-m "not integration"') docker: marks tests that require docker (deselect with '-m "not docker"') filterwarnings = ignore::DeprecationWarning ignore::PendingDeprecationWarning ``` -------------------------------------------------------------------------------- /tools/setup/setup_tables.sql: -------------------------------------------------------------------------------- ```sql -- Setup Tables in Trino with Hive Metastore -- This script creates necessary schemas and tables, then loads the Parquet data -- Create a schema for our data CREATE SCHEMA IF NOT EXISTS bullshit.raw; -- Create a table for our parquet data CREATE TABLE IF NOT EXISTS bullshit.raw.bullshit_data ( id BIGINT, name VARCHAR, value DOUBLE, category VARCHAR, created_at TIMESTAMP ) WITH ( external_location = 'file:///opt/trino/data', format = 'PARQUET' ); -- Show tables in our schema SELECT * FROM bullshit.raw.bullshit_data LIMIT 10; -- Create a view for convenience CREATE OR REPLACE VIEW bullshit.raw.bullshit_summary AS SELECT category, COUNT(*) as count, AVG(value) as avg_value, MIN(value) as min_value, MAX(value) as max_value FROM bullshit.raw.bullshit_data GROUP BY category; -- Query the view SELECT * FROM bullshit.raw.bullshit_summary ORDER BY count DESC; ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.10-slim WORKDIR /app # Create a non-root user RUN groupadd -r trino && useradd --no-log-init -r -g trino trino && \ mkdir -p /app/logs && \ chown -R trino:trino /app # Install runtime dependencies RUN apt-get update && apt-get install -y --no-install-recommends \ curl \ build-essential \ && rm -rf /var/lib/apt/lists/* # Copy project files COPY . /app/ # Install the MCP server RUN pip install --no-cache-dir . # Set environment variables ENV PYTHONUNBUFFERED=1 ENV PYTHONDONTWRITEBYTECODE=1 ENV MCP_HOST=0.0.0.0 ENV MCP_PORT=8000 ENV TRINO_HOST=trino ENV TRINO_PORT=8080 ENV TRINO_USER=trino ENV TRINO_CATALOG=memory # Expose ports for SSE transport and LLM API EXPOSE 8000 8001 # Switch to non-root user USER trino # Health check - use port 8001 for the health check endpoint and LLM API HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ CMD curl -f http://localhost:8001/health || exit 1 # Default command (can be overridden) ENTRYPOINT ["python", "-m", "trino_mcp.server"] # Default arguments (can be overridden) CMD ["--transport", "sse", "--host", "0.0.0.0", "--port", "8000", "--trino-host", "trino", "--trino-port", "8080", "--debug"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [project] name = "trino-mcp" version = "0.1.2" description = "Model Context Protocol (MCP) server for Trino" readme = "README.md" requires-python = ">=3.10" authors = [ {name = "Trino MCP Team"} ] classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ] dependencies = [ "mcp>=1.3.0,<1.4.0", "fastapi>=0.100.0", "trino>=0.329.0", "pydantic>=2.0.0", "loguru>=0.7.0", "uvicorn>=0.23.0", "contextlib-chdir>=1.0.2", ] [project.optional-dependencies] dev = [ "black>=23.0.0", "isort>=5.12.0", "mypy>=1.4.0", "pytest>=7.3.1", "pytest-cov>=4.1.0", ] [project.scripts] trino-mcp = "trino_mcp.server:main" [tool.hatch.build.targets.wheel] packages = ["src/trino_mcp"] [tool.black] line-length = 100 target-version = ["py310"] [tool.isort] profile = "black" line_length = 100 [tool.mypy] python_version = "3.10" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true disallow_incomplete_defs = true [[tool.mypy.overrides]] module = "tests.*" disallow_untyped_defs = false disallow_incomplete_defs = false ``` -------------------------------------------------------------------------------- /tools/run_queries.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Wait for Trino to be ready echo "Waiting for Trino to be ready..." sleep 30 echo "Creating schema in memory catalog..." docker exec -it trino_mcp_trino_1 trino --execute "CREATE SCHEMA IF NOT EXISTS memory.bullshit" echo "Creating table with sample data..." docker exec -it trino_mcp_trino_1 trino --execute " CREATE TABLE memory.bullshit.bullshit_data AS SELECT * FROM ( VALUES (1, 'Sample 1', 10.5, 'A', TIMESTAMP '2023-01-01 12:00:00'), (2, 'Sample 2', 20.7, 'B', TIMESTAMP '2023-01-02 13:00:00'), (3, 'Sample 3', 15.2, 'A', TIMESTAMP '2023-01-03 14:00:00'), (4, 'Sample 4', 30.1, 'C', TIMESTAMP '2023-01-04 15:00:00'), (5, 'Sample 5', 25.8, 'B', TIMESTAMP '2023-01-05 16:00:00') ) AS t(id, name, value, category, created_at) " echo "Querying data from table..." docker exec -it trino_mcp_trino_1 trino --execute "SELECT * FROM memory.bullshit.bullshit_data" echo "Creating summary view..." docker exec -it trino_mcp_trino_1 trino --execute " CREATE OR REPLACE VIEW memory.bullshit.bullshit_summary AS SELECT category, COUNT(*) as count, AVG(value) as avg_value, MIN(value) as min_value, MAX(value) as max_value FROM memory.bullshit.bullshit_data GROUP BY category " echo "Querying summary view..." docker exec -it trino_mcp_trino_1 trino --execute "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC" echo "Setup complete." ``` -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown # Changelog ## v0.1.2 (2023-06-01) ### ✨ New Features - **Integrated LLM API**: Added a native REST API endpoint to the MCP server for direct LLM queries - **Built-in FastAPI Endpoint**: Port 8001 now exposes a JSON API for running SQL queries without wrapper scripts - **Query Endpoint**: Added `/api/query` endpoint for executing SQL against Trino with JSON responses ### 📝 Documentation - Updated README with API usage instructions - Added code examples for the REST API ## v0.1.1 (2023-05-17) ### 🐛 Bug Fixes - **Fixed Trino client catalog handling**: The Trino client now correctly sets the catalog in connection parameters instead of using unreliable `USE catalog` statements. - **Improved query execution**: Queries now correctly execute against specified catalogs. - **Added error handling**: Better error handling for catalog and schema operations. ### 📝 Documentation - Added detailed documentation about transport options and known issues. - Created test scripts demonstrating successful MCP-Trino interaction. - Documented workarounds for MCP 1.3.0 SSE transport issues. ### 🧪 Testing - Added `test_mcp_stdio.py` for testing MCP with STDIO transport. - Added catalog connection testing scripts and diagnostics. ### 🚧 Known Issues - MCP 1.3.0 SSE transport has issues with client disconnection. - Use STDIO transport for reliable operation until upgrading to a newer MCP version. ``` -------------------------------------------------------------------------------- /tools/setup/setup_data.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash set -e echo "Waiting for Trino to become available..." max_attempts=20 attempt=0 while [ $attempt -lt $max_attempts ]; do if curl -s "http://localhost:9095/v1/info" > /dev/null; then echo "Trino is available!" break else attempt=$((attempt + 1)) echo "Attempt $attempt/$max_attempts: Trino not yet available. Waiting 5 seconds..." sleep 5 fi done if [ $attempt -eq $max_attempts ]; then echo "Failed to connect to Trino after multiple attempts" exit 1 fi echo -e "\n=== Creating schema and table ===" # Create a schema and table that points to our Parquet files trino_query=" -- Create schema if it doesn't exist CREATE SCHEMA IF NOT EXISTS bullshit.datasets WITH (location = 'file:///opt/trino/data'); -- Create table pointing to our Parquet file CREATE TABLE IF NOT EXISTS bullshit.datasets.employees WITH ( external_location = 'file:///opt/trino/data/bullshit_data.parquet', format = 'PARQUET' ) AS SELECT * FROM parquet 'file:///opt/trino/data/bullshit_data.parquet'; " # Execute the queries echo "$trino_query" | curl -s -X POST -H "X-Trino-User: trino" --data-binary @- http://localhost:9095/v1/statement | jq echo -e "\n=== Verifying data ===" # Run a simple query to verify the table curl -s -X POST -H "X-Trino-User: trino" --data "SELECT COUNT(*) FROM bullshit.datasets.employees" http://localhost:9095/v1/statement | jq curl -s -X POST -H "X-Trino-User: trino" --data "SELECT * FROM bullshit.datasets.employees LIMIT 3" http://localhost:9095/v1/statement | jq echo -e "\nSetup complete!" ``` -------------------------------------------------------------------------------- /run_tests.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash set -e # Setup colors for output GREEN='\033[0;32m' RED='\033[0;31m' YELLOW='\033[0;33m' NC='\033[0m' # No Color echo -e "${YELLOW}Trino MCP Server Test Runner${NC}" echo "===============================" # Check for virtual environment if [ -z "$VIRTUAL_ENV" ]; then echo -e "${YELLOW}No virtual environment detected.${NC}" # Check if venv exists if [ -d "venv" ]; then echo -e "${GREEN}Activating existing virtual environment...${NC}" source venv/bin/activate else echo -e "${YELLOW}Creating new virtual environment...${NC}" python -m venv venv source venv/bin/activate echo -e "${GREEN}Installing dependencies...${NC}" pip install -e ".[dev]" fi fi # Function to check if a command exists command_exists() { command -v "$1" &> /dev/null } # Check for Trino availability echo -e "${YELLOW}Checking Trino availability...${NC}" TRINO_HOST=${TEST_TRINO_HOST:-localhost} TRINO_PORT=${TEST_TRINO_PORT:-9095} if command_exists curl; then if curl -s -o /dev/null -w "%{http_code}" http://${TRINO_HOST}:${TRINO_PORT}/v1/info | grep -q "200"; then echo -e "${GREEN}Trino is available at ${TRINO_HOST}:${TRINO_PORT}${NC}" else echo -e "${RED}WARNING: Trino does not appear to be available at ${TRINO_HOST}:${TRINO_PORT}.${NC}" echo -e "${YELLOW}Some tests may be skipped or fail.${NC}" echo -e "${YELLOW}You may need to start Trino with Docker: docker-compose up -d${NC}" missing_trino=true fi else echo -e "${YELLOW}curl not found, skipping Trino availability check${NC}" fi # Run the tests echo -e "${YELLOW}Running unit tests...${NC}" pytest tests/ -v --exclude=tests/integration echo "" echo -e "${YELLOW}Running integration tests...${NC}" echo -e "${YELLOW}(These may be skipped if Docker is not available)${NC}" pytest tests/integration/ -v echo "" echo -e "${GREEN}All tests completed!${NC}" ``` -------------------------------------------------------------------------------- /src/trino_mcp/config.py: -------------------------------------------------------------------------------- ```python """ Configuration module for the Trino MCP server. """ from dataclasses import dataclass, field from typing import Any, Dict, Optional @dataclass class TrinoConfig: """Configuration for the Trino connection.""" host: str = "localhost" port: int = 8080 user: str = "trino" password: Optional[str] = None catalog: Optional[str] = None schema: Optional[str] = None http_scheme: str = "http" auth: Optional[Any] = None max_attempts: int = 3 request_timeout: float = 30.0 http_headers: Dict[str, str] = field(default_factory=dict) verify: bool = True @property def connection_params(self) -> Dict[str, Any]: """Return connection parameters for the Trino client.""" params = { "host": self.host, "port": self.port, "user": self.user, "http_scheme": self.http_scheme, "max_attempts": self.max_attempts, "request_timeout": self.request_timeout, "verify": self.verify, } if self.password: params["password"] = self.password if self.auth: params["auth"] = self.auth if self.http_headers: params["http_headers"] = self.http_headers return params @dataclass class ServerConfig: """Configuration for the MCP server.""" name: str = "Trino MCP" version: str = "0.1.0" transport_type: str = "stdio" # "stdio" or "sse" host: str = "127.0.0.1" port: int = 3000 debug: bool = False trino: TrinoConfig = field(default_factory=TrinoConfig) def load_config_from_env() -> ServerConfig: """ Load configuration from environment variables. Returns: ServerConfig: The server configuration. """ # This would normally load from environment variables or a config file # For now, we'll just return the default config return ServerConfig() ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml version: '3.8' services: # Hive Metastore with embedded Derby hive-metastore: image: apache/hive:3.1.3 container_name: trino_mcp_hive_metastore_1 environment: SERVICE_NAME: metastore HIVE_METASTORE_WAREHOUSE_DIR: /opt/hive/warehouse command: /opt/hive/bin/hive --service metastore volumes: - ./data:/opt/hive/data - hive-data:/opt/hive/warehouse ports: - "9083:9083" networks: - trino-net restart: unless-stopped healthcheck: test: ["CMD", "nc", "-z", "localhost", "9083"] interval: 10s timeout: 5s retries: 5 start_period: 20s # Trino service trino: image: trinodb/trino:latest container_name: trino_mcp_trino_1 ports: - "9095:8080" volumes: - ./etc:/etc/trino:ro - ./data:/opt/trino/data:ro - trino-data:/data/trino - trino-logs:/var/log/trino environment: - JAVA_OPTS=-Xmx2G -XX:+UseG1GC networks: - trino-net depends_on: - hive-metastore deploy: resources: limits: cpus: '2' memory: 3G reservations: cpus: '0.5' memory: 1G healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8080/v1/info"] interval: 10s timeout: 5s retries: 5 start_period: 30s restart: unless-stopped # MCP server for Trino trino-mcp: build: context: . dockerfile: Dockerfile container_name: trino_mcp_trino-mcp_1 ports: - "9096:8000" # Main MCP SSE port - "9097:8001" # LLM API port with health check endpoint volumes: - mcp-logs:/app/logs environment: - PYTHONUNBUFFERED=1 - TRINO_HOST=trino - TRINO_PORT=8080 - LOG_LEVEL=INFO - MCP_HOST=0.0.0.0 - MCP_PORT=8000 depends_on: trino: condition: service_healthy networks: - trino-net deploy: resources: limits: cpus: '1' memory: 1G reservations: cpus: '0.25' memory: 512M healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8001/health"] interval: 20s timeout: 5s retries: 3 start_period: 10s restart: unless-stopped networks: trino-net: driver: bridge name: trino_network volumes: trino-data: name: trino_data trino-logs: name: trino_logs mcp-logs: name: mcp_logs hive-data: name: hive_warehouse_data ``` -------------------------------------------------------------------------------- /llm_trino_api.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Simple FastAPI server that lets LLMs query Trino through MCP via a REST API. Run with: pip install fastapi uvicorn uvicorn llm_trino_api:app --reload This creates a REST API endpoint at: http://localhost:8000/query Example curl: curl -X POST "http://localhost:8000/query" \\ -H "Content-Type: application/json" \\ -d '{"query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3"}' """ import fastapi import pydantic from llm_query_trino import query_trino, format_results from typing import Optional, Dict, Any # Create FastAPI app app = fastapi.FastAPI( title="Trino MCP API for LLMs", description="Simple API to query Trino via MCP protocol for LLMs", version="0.1.0" ) # Define request model class QueryRequest(pydantic.BaseModel): query: str catalog: str = "memory" schema: Optional[str] = "bullshit" explain: bool = False # Define response model class QueryResponse(pydantic.BaseModel): success: bool message: str results: Optional[Dict[str, Any]] = None formatted_results: Optional[str] = None @app.post("/query", response_model=QueryResponse) async def trino_query(request: QueryRequest): """ Execute a SQL query against Trino via MCP and return results. Example: ```json { "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3", "catalog": "memory", "schema": "bullshit" } ``` """ try: # If explain mode is on, add EXPLAIN to the query query = request.query if request.explain: query = f"EXPLAIN {query}" # Execute the query results = query_trino(query, request.catalog, request.schema) # Check for errors if "error" in results: return QueryResponse( success=False, message=f"Query execution failed: {results['error']}", results=results ) # Format results for human readability formatted_results = format_results(results) return QueryResponse( success=True, message="Query executed successfully", results=results, formatted_results=formatted_results ) except Exception as e: return QueryResponse( success=False, message=f"Error executing query: {str(e)}" ) @app.get("/") async def root(): """Root endpoint with usage instructions.""" return { "message": "Trino MCP API for LLMs", "usage": "POST to /query with JSON body containing 'query', 'catalog' (optional), and 'schema' (optional)", "example": { "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3", "catalog": "memory", "schema": "bullshit" } } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8008) ``` -------------------------------------------------------------------------------- /scripts/test_direct_query.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Direct test script that bypasses MCP and uses the Trino client directly. This helps determine if the issue is with the MCP protocol or with Trino. """ import time import argparse from typing import Optional, Dict, Any # Import the client class from the module from src.trino_mcp.trino_client import TrinoClient from src.trino_mcp.config import TrinoConfig def main(): """ Run direct queries against Trino without using MCP. """ print("Direct Trino test - bypassing MCP") # Configure Trino client config = TrinoConfig( host="localhost", port=9095, # The exposed Trino port user="trino", catalog="memory", schema=None, http_scheme="http" ) client = TrinoClient(config) try: # Connect to Trino print("Connecting to Trino...") client.connect() print("Connected successfully!") # List catalogs print("\nListing catalogs:") catalogs = client.get_catalogs() for catalog in catalogs: print(f"- {catalog['name']}") # List schemas in memory catalog print("\nListing schemas in memory catalog:") schemas = client.get_schemas("memory") for schema in schemas: print(f"- {schema['name']}") # Look for our test schema if any(schema['name'] == 'bullshit' for schema in schemas): print("\nFound our test schema 'bullshit'") # List tables print("\nListing tables in memory.bullshit:") tables = client.get_tables("memory", "bullshit") for table in tables: print(f"- {table['name']}") # Query the data table if any(table['name'] == 'bullshit_data' for table in tables): print("\nQuerying memory.bullshit.bullshit_data:") result = client.execute_query("SELECT * FROM memory.bullshit.bullshit_data") # Print columns print(f"Columns: {', '.join(result.columns)}") # Print rows print(f"Rows ({result.row_count}):") for row in result.rows: print(f" {row}") # Query the summary view if any(table['name'] == 'bullshit_summary' for table in tables): print("\nQuerying memory.bullshit.bullshit_summary:") result = client.execute_query( "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC" ) # Print columns print(f"Columns: {', '.join(result.columns)}") # Print rows print(f"Rows ({result.row_count}):") for row in result.rows: print(f" {row}") else: print("Summary view not found") else: print("Data table not found") else: print("Test schema 'bullshit' not found") except Exception as e: print(f"Error: {e}") finally: # Disconnect if client.conn: print("\nDisconnecting from Trino...") client.disconnect() print("Disconnected.") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/trino_mcp/resources/__init__.py: -------------------------------------------------------------------------------- ```python """ MCP resources for interacting with Trino. """ from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from mcp.server.fastmcp import Context, FastMCP from trino_mcp.trino_client import TrinoClient def register_trino_resources(mcp: FastMCP, client: TrinoClient) -> None: """ Register Trino resources with the MCP server. Args: mcp: The MCP server instance. client: The Trino client instance. """ @mcp.resource("trino://catalog") def list_catalogs() -> List[Dict[str, Any]]: """ List all available Trino catalogs. """ return client.get_catalogs() @mcp.resource("trino://catalog/{catalog}") def get_catalog(catalog: str) -> Dict[str, Any]: """ Get information about a specific Trino catalog. """ # For now, just return basic info - could be enhanced later return {"name": catalog} @mcp.resource("trino://catalog/{catalog}/schemas") def list_schemas(catalog: str) -> List[Dict[str, Any]]: """ List all schemas in a Trino catalog. """ return client.get_schemas(catalog) @mcp.resource("trino://catalog/{catalog}/schema/{schema}") def get_schema(catalog: str, schema: str) -> Dict[str, Any]: """ Get information about a specific Trino schema. """ return {"name": schema, "catalog": catalog} @mcp.resource("trino://catalog/{catalog}/schema/{schema}/tables") def list_tables(catalog: str, schema: str) -> List[Dict[str, Any]]: """ List all tables in a Trino schema. """ return client.get_tables(catalog, schema) @mcp.resource("trino://catalog/{catalog}/schema/{schema}/table/{table}") def get_table(catalog: str, schema: str, table: str) -> Dict[str, Any]: """ Get information about a specific Trino table. """ return client.get_table_details(catalog, schema, table) @mcp.resource("trino://catalog/{catalog}/schema/{schema}/table/{table}/columns") def list_columns(catalog: str, schema: str, table: str) -> List[Dict[str, Any]]: """ List all columns in a Trino table. """ return client.get_columns(catalog, schema, table) @mcp.resource("trino://catalog/{catalog}/schema/{schema}/table/{table}/column/{column}") def get_column(catalog: str, schema: str, table: str, column: str) -> Dict[str, Any]: """ Get information about a specific Trino column. """ columns = client.get_columns(catalog, schema, table) for col in columns: if col["name"] == column: return col # If column not found, return a basic structure return { "name": column, "catalog": catalog, "schema": schema, "table": table, "error": "Column not found" } @mcp.resource("trino://query/{query_id}") def get_query_result(query_id: str) -> Dict[str, Any]: """ Get the result of a specific Trino query by its ID. """ # This is a placeholder, as we don't store query results by ID in this basic implementation # In a real implementation, you would look up the query results from a cache or storage return { "query_id": query_id, "error": "Query results not available. This resource is for demonstration purposes only." } ``` -------------------------------------------------------------------------------- /scripts/fix_trino_session.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Direct test of Trino client session catalog handling. This script tests various ways to set the catalog name in Trino. """ import sys import time import traceback import trino def test_trino_sessions(): """Test different approaches to setting the catalog in Trino sessions""" print("🔬 Testing Trino session catalog handling") # Test 1: Default connection and USE statements print("\n=== Test 1: Default connection with USE statements ===") try: conn = trino.dbapi.connect( host="trino", port=8080, user="trino", http_scheme="http" ) print("Connection established") cursor1 = conn.cursor() # Try to set catalog with USE statement print("Setting catalog with USE statement") cursor1.execute("USE memory") # Try a query with the set catalog print("Executing query with set catalog") try: cursor1.execute("SELECT 1 as test") result = cursor1.fetchall() print(f"Result: {result}") except Exception as e: print(f"❌ Query failed: {e}") conn.close() except Exception as e: print(f"❌ Test 1 failed: {e}") traceback.print_exception(type(e), e, e.__traceback__) # Test 2: Connection with catalog parameter print("\n=== Test 2: Connection with catalog parameter ===") try: conn = trino.dbapi.connect( host="trino", port=8080, user="trino", http_scheme="http", catalog="memory" ) print("Connection established with catalog parameter") cursor2 = conn.cursor() # Try a query with the catalog parameter print("Executing query with catalog parameter") try: cursor2.execute("SELECT 1 as test") result = cursor2.fetchall() print(f"Result: {result}") except Exception as e: print(f"❌ Query failed: {e}") conn.close() except Exception as e: print(f"❌ Test 2 failed: {e}") traceback.print_exception(type(e), e, e.__traceback__) # Test 3: Explicit catalog in query print("\n=== Test 3: Explicit catalog in query ===") try: conn = trino.dbapi.connect( host="trino", port=8080, user="trino", http_scheme="http" ) print("Connection established") cursor3 = conn.cursor() # Try a query with explicit catalog in the query print("Executing query with explicit catalog") try: cursor3.execute("SELECT 1 as test FROM memory.information_schema.tables WHERE 1=0") result = cursor3.fetchall() print(f"Result: {result}") except Exception as e: print(f"❌ Query failed: {e}") conn.close() except Exception as e: print(f"❌ Test 3 failed: {e}") traceback.print_exception(type(e), e, e.__traceback__) # Test 4: Connection parameters with session properties print("\n=== Test 4: Connection with session properties ===") try: conn = trino.dbapi.connect( host="trino", port=8080, user="trino", http_scheme="http", catalog="memory", session_properties={"catalog": "memory"} ) print("Connection established with session properties") cursor4 = conn.cursor() # Try a query with session properties print("Executing query with session properties") try: cursor4.execute("SELECT 1 as test") result = cursor4.fetchall() print(f"Result: {result}") except Exception as e: print(f"❌ Query failed: {e}") conn.close() except Exception as e: print(f"❌ Test 4 failed: {e}") traceback.print_exception(type(e), e, e.__traceback__) print("\n🏁 Testing complete!") if __name__ == "__main__": test_trino_sessions() ``` -------------------------------------------------------------------------------- /examples/simple_mcp_query.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Simple example script for Trino MCP querying using STDIO transport. This demonstrates the most basic end-to-end flow of running a query through MCP. """ import json import subprocess import sys import time def run_query_with_mcp(sql_query: str, catalog: str = "memory"): """ Run a SQL query against Trino using the MCP STDIO transport. Args: sql_query: The SQL query to run catalog: The catalog to use (default: memory) Returns: The query results (if successful) """ print(f"🚀 Running query with Trino MCP") print(f"SQL: {sql_query}") print(f"Catalog: {catalog}") # Start the MCP server with STDIO transport cmd = [ "docker", "exec", "-i", "trino_mcp_trino-mcp_1", "python", "-m", "trino_mcp.server", "--transport", "stdio", "--trino-host", "trino", "--trino-port", "8080", "--trino-user", "trino", "--trino-catalog", catalog ] try: process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) # Allow server to start time.sleep(1) # Function to send requests and get responses def send_request(request): request_json = json.dumps(request) + "\n" process.stdin.write(request_json) process.stdin.flush() response = process.stdout.readline() if response: return json.loads(response) return None # Step 1: Initialize MCP print("\n1. Initializing MCP...") init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "simple-example", "version": "1.0.0" }, "capabilities": { "tools": True } } } init_response = send_request(init_request) if not init_response: raise Exception("Failed to initialize MCP") print("✅ MCP initialized") # Step 2: Send initialized notification init_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} } send_request(init_notification) # Step 3: Execute query print("\n2. Executing query...") query_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": sql_query, "catalog": catalog } } } query_response = send_request(query_request) if not query_response: raise Exception("Failed to execute query") if "error" in query_response: error = query_response["error"] print(f"❌ Query failed: {error}") return None # Print and return results result = query_response["result"] print("\n✅ Query executed successfully!") # Format results for display if "columns" in result: print("\nColumns:", ", ".join(result.get("columns", []))) print(f"Row count: {result.get('row_count', 0)}") if "preview_rows" in result: print("\nResults:") for i, row in enumerate(result["preview_rows"]): print(f" {i+1}. {row}") return result except Exception as e: print(f"❌ Error: {e}") return None finally: # Clean up if 'process' in locals(): process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() if __name__ == "__main__": # Get query from command line args or use default query = "SELECT 'Hello from Trino MCP!' AS greeting" if len(sys.argv) > 1: query = sys.argv[1] # Run the query run_query_with_mcp(query) ``` -------------------------------------------------------------------------------- /tools/setup_bullshit_table.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Set up the bullshit schema and table in Trino """ import os import time import trino import pandas as pd from trino.exceptions import TrinoExternalError # Connect to Trino def connect_to_trino(): print("Waiting for Trino to become available...") max_attempts = 20 attempt = 0 while attempt < max_attempts: try: conn = trino.dbapi.connect( host="localhost", port=9095, user="trino", catalog="bullshit", schema="datasets", ) # Test the connection with conn.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() print("Trino is available!") return conn except Exception as e: attempt += 1 print(f"Attempt {attempt}/{max_attempts}: Trino not yet available. Waiting 5 seconds... ({str(e)})") time.sleep(5) raise Exception("Failed to connect to Trino after multiple attempts") # Create schema if it doesn't exist def create_schema(conn): print("Creating schema if it doesn't exist...") with conn.cursor() as cursor: try: # Try to list tables in the schema to see if it exists try: cursor.execute("SHOW TABLES FROM bullshit.datasets") rows = cursor.fetchall() print(f"Schema already exists with {len(rows)} tables") return except Exception as e: pass # Schema probably doesn't exist, continue to create it # Create schema cursor.execute(""" CREATE SCHEMA IF NOT EXISTS bullshit.datasets WITH (location = 'file:///bullshit-data') """) print("Schema created successfully") except Exception as e: print(f"Error creating schema: {e}") # Continue anyway, the error might be that the schema already exists # Get table schema from parquet file def get_parquet_schema(): print("Reading parquet file to determine schema...") try: df = pd.read_parquet('data/bullshit_data.parquet') # Map pandas dtypes to Trino types type_mapping = { 'int64': 'INTEGER', 'int32': 'INTEGER', 'float64': 'DOUBLE', 'float32': 'DOUBLE', 'object': 'VARCHAR', 'bool': 'BOOLEAN', 'datetime64[ns]': 'TIMESTAMP', } columns = [] for col_name, dtype in df.dtypes.items(): trino_type = type_mapping.get(str(dtype), 'VARCHAR') columns.append(f'"{col_name}" {trino_type}') return columns except Exception as e: print(f"Error reading parquet file: {e}") return None # Create the table def create_table(conn, columns): print("Creating table...") columns_str = ",\n ".join(columns) sql = f""" CREATE TABLE IF NOT EXISTS bullshit.datasets.employees ( {columns_str} ) WITH ( external_location = 'file:///bullshit-data/bullshit_data.parquet', format = 'PARQUET' ) """ print("SQL:", sql) with conn.cursor() as cursor: try: cursor.execute(sql) print("Table created successfully") except Exception as e: print(f"Error creating table: {e}") # Verify table was created by running a query def verify_table(conn): print("Verifying table creation...") with conn.cursor() as cursor: try: cursor.execute("SELECT * FROM bullshit.datasets.employees LIMIT 5") rows = cursor.fetchall() print(f"Successfully queried table with {len(rows)} rows") if rows: print("First row:") for row in rows: print(row) break except Exception as e: print(f"Error verifying table: {e}") def main(): try: conn = connect_to_trino() print("Connecting to Trino...") create_schema(conn) columns = get_parquet_schema() if columns: create_table(conn, columns) verify_table(conn) else: print("Failed to get table schema from parquet file") except Exception as e: print(f"An error occurred: {e}") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /tests/test_client.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Simple test script for the Trino MCP client. This script connects to the Trino MCP server and performs some basic operations. """ import json import os import sys import time import threading from typing import Dict, Any, List, Optional, Callable import requests import sseclient import logging from rich.console import Console # Default port for MCP server, changed to match docker-compose.yml DEFAULT_MCP_HOST = "localhost" DEFAULT_MCP_PORT = 9096 class SSEListener: """ Server-Sent Events (SSE) listener for MCP. This runs in a separate thread to receive notifications from the server. """ def __init__(self, url: str, message_callback: Callable[[Dict[str, Any]], None]): """ Initialize the SSE listener. Args: url: The SSE endpoint URL. message_callback: Callback function to handle incoming messages. """ self.url = url self.message_callback = message_callback self.running = False self.thread = None def start(self) -> None: """Start the SSE listener in a separate thread.""" if self.running: return self.running = True self.thread = threading.Thread(target=self._listen) self.thread.daemon = True self.thread.start() def stop(self) -> None: """Stop the SSE listener.""" self.running = False if self.thread: self.thread.join(timeout=1.0) self.thread = None def _listen(self) -> None: """Listen for SSE events.""" try: headers = {"Accept": "text/event-stream"} response = requests.get(self.url, headers=headers, stream=True) client = sseclient.SSEClient(response) for event in client.events(): if not self.running: break try: if event.data: data = json.loads(event.data) self.message_callback(data) except json.JSONDecodeError: print(f"Failed to parse SSE message: {event.data}") except Exception as e: print(f"Error processing SSE message: {e}") except Exception as e: if self.running: print(f"SSE connection error: {e}") finally: self.running = False def test_sse_client(base_url=f"http://localhost:{DEFAULT_MCP_PORT}"): """ Test communication with the SSE transport. Args: base_url: The base URL of the SSE server. """ print(f"Testing SSE client with {base_url}...") # First, let's check what endpoints are available print("Checking available endpoints...") try: response = requests.get(base_url) print(f"Root path status: {response.status_code}") if response.status_code == 200: print(f"Content: {response.text[:500]}") # Print first 500 chars except Exception as e: print(f"Error checking root path: {e}") # Try common MCP endpoints endpoints_to_check = [ "/mcp", "/mcp/sse", "/mcp/2024-11-05", "/mcp/message", "/api/mcp", "/api/mcp/sse" ] for endpoint in endpoints_to_check: try: url = f"{base_url}{endpoint}" print(f"\nChecking endpoint: {url}") response = requests.get(url) print(f"Status: {response.status_code}") if response.status_code == 200: print(f"Content: {response.text[:100]}") # Print first 100 chars except Exception as e: print(f"Error: {e}") # Try the /sse endpoint with proper SSE headers print("\nChecking SSE endpoint with proper headers...") try: sse_url = f"{base_url}/sse" print(f"Connecting to SSE endpoint: {sse_url}") # Setup SSE message handler def handle_sse_message(message): print(f"Received SSE message: {message.data}") # Use the SSEClient to connect properly print("Starting SSE connection...") headers = {"Accept": "text/event-stream"} response = requests.get(sse_url, headers=headers, stream=True) client = sseclient.SSEClient(response) # Try to get the first few events print("Waiting for SSE events...") event_count = 0 for event in client.events(): print(f"Event received: {event.data}") event_count += 1 if event_count >= 3: # Get at most 3 events break except Exception as e: print(f"Error with SSE connection: {e}") if __name__ == "__main__": # Get the server URL from environment or command line server_url = os.environ.get("SERVER_URL", f"http://localhost:{DEFAULT_MCP_PORT}") if len(sys.argv) > 1: server_url = sys.argv[1] test_sse_client(server_url) ``` -------------------------------------------------------------------------------- /scripts/test_quick_query.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Quick test script that runs a single query and exits properly. Shows that Trino works but is just empty. """ import json import subprocess import sys import time import threading import os def run_quick_query(): """Run a quick query against Trino via MCP and exit properly.""" print("🚀 Running quick query test - this should exit cleanly!") # Get the current directory for module path current_dir = os.path.abspath(os.path.dirname(__file__)) # Start the server process with STDIO transport process = subprocess.Popen( ["python", "src/trino_mcp/server.py", "--transport", "stdio"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, # Line buffered env=dict(os.environ, PYTHONPATH=os.path.join(current_dir, "src")) ) # Create a thread to read stderr to prevent deadlocks def read_stderr(): for line in process.stderr: print(f"[SERVER] {line.strip()}") stderr_thread = threading.Thread(target=read_stderr, daemon=True) stderr_thread.start() # Wait a bit for the server to start up time.sleep(2) query_response = None try: # Send initialize request initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": {"name": "quick-query-test", "version": "1.0.0"}, "capabilities": {"tools": True, "resources": {"supportedSources": ["trino://catalog"]}} } } print(f"Sending initialize request: {json.dumps(initialize_request)}") process.stdin.write(json.dumps(initialize_request) + "\n") process.stdin.flush() # Read initialize response with timeout start_time = time.time() timeout = 5 initialize_response = None print("Waiting for initialize response...") while time.time() - start_time < timeout: response_line = process.stdout.readline().strip() if response_line: print(f"Got response: {response_line}") try: initialize_response = json.loads(response_line) break except json.JSONDecodeError as e: print(f"Error parsing response: {e}") time.sleep(0.1) if not initialize_response: print("❌ Timeout waiting for initialize response") return print(f"✅ Initialize response received: {initialize_response.get('result', {}).get('serverInfo', {}).get('name', 'unknown')}") # Send initialized notification with correct format initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} } print(f"Sending initialized notification: {json.dumps(initialized_notification)}") process.stdin.write(json.dumps(initialized_notification) + "\n") process.stdin.flush() # Send query request - intentionally simple query that works with empty memory connector query_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT 'empty_as_fuck' AS status", "catalog": "memory" } } } print(f"Sending query request: {json.dumps(query_request)}") process.stdin.write(json.dumps(query_request) + "\n") process.stdin.flush() # Read query response with timeout start_time = time.time() query_response = None print("Waiting for query response...") while time.time() - start_time < timeout: response_line = process.stdout.readline().strip() if response_line: print(f"Got response: {response_line}") try: query_response = json.loads(response_line) break except json.JSONDecodeError as e: print(f"Error parsing response: {e}") time.sleep(0.1) if not query_response: print("❌ Timeout waiting for query response") return print("\n🔍 QUERY RESULTS:") if "error" in query_response: print(f"❌ Error: {query_response['error']}") else: result = query_response.get('result', {}) print(f"Query ID: {result.get('query_id', 'unknown')}") print(f"Columns: {result.get('columns', [])}") print(f"Row count: {result.get('row_count', 0)}") print(f"Preview rows: {json.dumps(result.get('preview_rows', []), indent=2)}") except Exception as e: print(f"❌ Exception: {e}") finally: # Properly terminate print("\n👋 Test completed. Terminating server process...") process.terminate() try: process.wait(timeout=2) except subprocess.TimeoutExpired: process.kill() return query_response if __name__ == "__main__": run_quick_query() ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python """ Pytest configuration for the Trino MCP server tests. """ import os import time import json import pytest import requests import subprocess import signal from typing import Dict, Any, Iterator, Tuple # Define constants TEST_SERVER_PORT = 7000 # Using port 7000 to avoid ALL conflicts with existing containers TEST_SERVER_URL = f"http://localhost:{TEST_SERVER_PORT}" TRINO_HOST = os.environ.get("TEST_TRINO_HOST", "localhost") TRINO_PORT = int(os.environ.get("TEST_TRINO_PORT", "9095")) TRINO_USER = os.environ.get("TEST_TRINO_USER", "trino") class TrinoMCPTestServer: """Helper class to manage a test instance of the Trino MCP server.""" def __init__(self, port: int = TEST_SERVER_PORT): self.port = port self.process = None def start(self) -> None: """Start the server process.""" cmd = [ "python", "-m", "trino_mcp.server", "--transport", "sse", "--port", str(self.port), "--trino-host", TRINO_HOST, "--trino-port", str(TRINO_PORT), "--trino-user", TRINO_USER, "--trino-catalog", "memory", "--debug" ] env = os.environ.copy() env["PYTHONPATH"] = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for server to start self._wait_for_server() def stop(self) -> None: """Stop the server process.""" if self.process: self.process.send_signal(signal.SIGINT) self.process.wait() self.process = None def _wait_for_server(self, max_retries: int = 10, retry_interval: float = 0.5) -> None: """Wait for the server to become available.""" for _ in range(max_retries): try: response = requests.get(f"{TEST_SERVER_URL}/mcp") if response.status_code == 200: return except requests.exceptions.ConnectionError: pass time.sleep(retry_interval) raise TimeoutError(f"Server did not start within {max_retries * retry_interval} seconds") def check_trino_available() -> bool: """Check if Trino server is available for testing.""" try: response = requests.get(f"http://{TRINO_HOST}:{TRINO_PORT}/v1/info") return response.status_code == 200 except requests.exceptions.ConnectionError: return False class MCPClient: """Simple MCP client for testing.""" def __init__(self, base_url: str = TEST_SERVER_URL): self.base_url = base_url self.next_id = 1 self.initialized = False def initialize(self) -> Dict[str, Any]: """Initialize the MCP session.""" if self.initialized: return {"already_initialized": True} response = self._send_request("initialize", { "capabilities": {} }) self.initialized = True return response def list_tools(self) -> Dict[str, Any]: """List available tools.""" return self._send_request("tools/list") def list_resources(self, source: str = None, path: str = None) -> Dict[str, Any]: """List resources.""" params = {} if source: params["source"] = source if path: params["path"] = path return self._send_request("resources/list", params) def get_resource(self, source: str, path: str) -> Dict[str, Any]: """Get a specific resource.""" return self._send_request("resources/get", { "source": source, "path": path }) def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call a tool with the given arguments.""" return self._send_request("tools/call", { "name": name, "arguments": arguments }) def shutdown(self) -> Dict[str, Any]: """Shutdown the MCP session.""" response = self._send_request("shutdown") self.initialized = False return response def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a JSON-RPC request to the server.""" request = { "jsonrpc": "2.0", "id": self.next_id, "method": method } if params is not None: request["params"] = params self.next_id += 1 response = requests.post( f"{self.base_url}/mcp/message", json=request ) if response.status_code != 200: raise Exception(f"Request failed with status {response.status_code}: {response.text}") return response.json() @pytest.fixture(scope="session") def trino_available() -> bool: """Check if Trino is available.""" available = check_trino_available() if not available: pytest.skip("Trino server is not available for testing") return available @pytest.fixture(scope="session") def mcp_server(trino_available) -> Iterator[None]: """ Start a test instance of the Trino MCP server for the test session. Args: trino_available: Fixture to ensure Trino is available. Yields: None """ server = TrinoMCPTestServer() try: server.start() yield finally: server.stop() @pytest.fixture def mcp_client(mcp_server) -> Iterator[MCPClient]: """ Create a test MCP client connected to the test server. Args: mcp_server: The server fixture. Yields: MCPClient: An initialized MCP client. """ client = MCPClient() client.initialize() try: yield client finally: try: client.shutdown() except: pass # Ignore errors during shutdown ``` -------------------------------------------------------------------------------- /src/trino_mcp/tools/__init__.py: -------------------------------------------------------------------------------- ```python """ MCP tools for executing operations on Trino. """ from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union from loguru import logger from mcp.server.fastmcp import Context, FastMCP from trino_mcp.trino_client import TrinoClient def register_trino_tools(mcp: FastMCP, client: TrinoClient) -> None: """ Register Trino tools with the MCP server. Args: mcp: The MCP server instance. client: The Trino client instance. """ @mcp.tool() def execute_query( sql: str, catalog: Optional[str] = None, schema: Optional[str] = None ) -> Dict[str, Any]: """ Execute a SQL query against Trino. Args: sql: The SQL query to execute. catalog: Optional catalog name to use for the query. schema: Optional schema name to use for the query. Returns: Dict[str, Any]: Query results including metadata. """ logger.info(f"Executing query: {sql}") try: result = client.execute_query(sql, catalog, schema) # Format the result in a structured way formatted_result = { "query_id": result.query_id, "columns": result.columns, "row_count": result.row_count, "query_time_ms": result.query_time_ms } # Add preview of results (first 20 rows) preview_rows = [] max_preview_rows = min(20, len(result.rows)) for i in range(max_preview_rows): row_dict = {} for j, col in enumerate(result.columns): row_dict[col] = result.rows[i][j] preview_rows.append(row_dict) formatted_result["preview_rows"] = preview_rows # Include a resource path for full results formatted_result["resource_path"] = f"trino://query/{result.query_id}" return formatted_result except Exception as e: error_msg = str(e) logger.error(f"Query execution failed: {error_msg}") return { "error": error_msg, "query": sql } @mcp.tool() def cancel_query(query_id: str) -> Dict[str, Any]: """ Cancel a running query. Args: query_id: ID of the query to cancel. Returns: Dict[str, Any]: Result of the cancellation operation. """ logger.info(f"Cancelling query: {query_id}") try: success = client.cancel_query(query_id) if success: return { "success": True, "message": f"Query {query_id} cancelled successfully" } else: return { "success": False, "message": f"Failed to cancel query {query_id}" } except Exception as e: error_msg = str(e) logger.error(f"Query cancellation failed: {error_msg}") return { "success": False, "error": error_msg, "query_id": query_id } @mcp.tool() def inspect_table( catalog: str, schema: str, table: str ) -> Dict[str, Any]: """ Get detailed metadata about a table. Args: catalog: Catalog name. schema: Schema name. table: Table name. Returns: Dict[str, Any]: Table metadata including columns, statistics, etc. """ logger.info(f"Inspecting table: {catalog}.{schema}.{table}") try: table_details = client.get_table_details(catalog, schema, table) # Try to get a row count (this might not work on all connectors) try: count_result = client.execute_query( f"SELECT count(*) AS row_count FROM {catalog}.{schema}.{table}" ) if count_result.rows and count_result.rows[0]: table_details["row_count"] = count_result.rows[0][0] except Exception as e: logger.warning(f"Failed to get row count: {e}") # Get additional info from the information_schema if available try: info_schema_query = f""" SELECT column_name, data_type, is_nullable, column_default FROM {catalog}.information_schema.columns WHERE table_catalog = '{catalog}' AND table_schema = '{schema}' AND table_name = '{table}' """ info_schema_result = client.execute_query(info_schema_query) enhanced_columns = [] for col in table_details["columns"]: enhanced_col = col.copy() # Find matching info_schema row for row in info_schema_result.rows: if row[0] == col["name"]: enhanced_col["data_type"] = row[1] enhanced_col["is_nullable"] = row[2] enhanced_col["default"] = row[3] break enhanced_columns.append(enhanced_col) table_details["columns"] = enhanced_columns except Exception as e: logger.warning(f"Failed to get column details from information_schema: {e}") return table_details except Exception as e: error_msg = str(e) logger.error(f"Table inspection failed: {error_msg}") return { "error": error_msg, "catalog": catalog, "schema": schema, "table": table } ``` -------------------------------------------------------------------------------- /llm_query_trino.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Simple wrapper script for LLMs to query Trino through MCP. This script handles all the MCP protocol complexity, so the LLM only needs to focus on SQL. Usage: python llm_query_trino.py "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5" """ import json import subprocess import sys import time from typing import Dict, Any, List, Optional # Default configurations - modify as needed DEFAULT_CATALOG = "memory" DEFAULT_SCHEMA = "bullshit" def query_trino(sql_query: str, catalog: str = DEFAULT_CATALOG, schema: Optional[str] = DEFAULT_SCHEMA) -> Dict[str, Any]: """ Run a SQL query against Trino through MCP and return the results. Args: sql_query: The SQL query to execute catalog: Catalog name (default: memory) schema: Schema name (default: bullshit) Returns: Dictionary with query results or error """ print(f"\n🔍 Running query via Trino MCP:\n{sql_query}") # Start the MCP server with STDIO transport cmd = [ "docker", "exec", "-i", "trino_mcp_trino-mcp_1", "python", "-m", "trino_mcp.server", "--transport", "stdio", "--debug", "--trino-host", "trino", "--trino-port", "8080", "--trino-user", "trino", "--trino-catalog", catalog ] try: process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) # Wait for MCP server to start time.sleep(2) # Helper function to send requests def send_request(request, expect_response=True): request_str = json.dumps(request) + "\n" process.stdin.write(request_str) process.stdin.flush() if not expect_response: return None response_str = process.stdout.readline() if response_str: return json.loads(response_str) return None # Step 1: Initialize MCP init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "llm-query-client", "version": "1.0.0" }, "capabilities": { "tools": True } } } init_response = send_request(init_request) if not init_response: return {"error": "Failed to initialize MCP"} # Step 2: Send initialized notification init_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} } send_request(init_notification, expect_response=False) # Step 3: Execute query query_args = {"sql": sql_query, "catalog": catalog} if schema: query_args["schema"] = schema query_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "execute_query", "arguments": query_args } } query_response = send_request(query_request) if not query_response: return {"error": "No response received for query"} if "error" in query_response: return {"error": query_response["error"]} # Step 4: Parse the content try: # Extract nested result content content_text = query_response.get("result", {}).get("content", [{}])[0].get("text", "{}") result_data = json.loads(content_text) # Clean up the results for easier consumption return { "success": True, "query_id": result_data.get("query_id", "unknown"), "columns": result_data.get("columns", []), "row_count": result_data.get("row_count", 0), "rows": result_data.get("preview_rows", []), "execution_time_ms": result_data.get("query_time_ms", 0) } except Exception as e: return { "error": f"Error parsing results: {str(e)}", "raw_response": query_response } except Exception as e: return {"error": f"Error: {str(e)}"} finally: if 'process' in locals() and process.poll() is None: process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() def format_results(results: Dict[str, Any]) -> str: """Format query results for display""" if "error" in results: return f"❌ Error: {results['error']}" if not results.get("success"): return f"❌ Query failed: {results}" output = [ f"✅ Query executed successfully!", f"📊 Rows: {results['row_count']}", f"⏱️ Execution Time: {results['execution_time_ms']:.2f}ms", f"\nColumns: {', '.join(results['columns'])}", f"\nResults:" ] # Table header if results["columns"]: header = " | ".join(f"{col.upper()}" for col in results["columns"]) output.append(header) output.append("-" * len(header)) # Table rows for row in results["rows"]: values = [] for col in results["columns"]: values.append(str(row.get(col, "NULL"))) output.append(" | ".join(values)) return "\n".join(output) def main(): """Run a query from command line arguments""" if len(sys.argv) < 2: print("Usage: python llm_query_trino.py 'SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5'") sys.exit(1) # Get the SQL query from command line sql_query = sys.argv[1] # Parse optional catalog and schema catalog = DEFAULT_CATALOG schema = DEFAULT_SCHEMA if len(sys.argv) > 2: catalog = sys.argv[2] if len(sys.argv) > 3: schema = sys.argv[3] # Execute the query results = query_trino(sql_query, catalog, schema) # Print formatted results print(format_results(results)) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /load_bullshit_data.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Quick script to load our bullshit data directly into Trino using the memory connector instead of relying on the Hive metastore which seems to be having issues. """ import pandas as pd import trino import time import sys # Configure Trino connection TRINO_HOST = "localhost" TRINO_PORT = 9095 TRINO_USER = "trino" TRINO_CATALOG = "memory" def main(): print("🚀 Loading bullshit data into Trino...") # Load the parquet data try: print("Reading the bullshit data...") df = pd.read_parquet('data/bullshit_data.parquet') print(f"Loaded {len(df)} rows of bullshit data") except Exception as e: print(f"❌ Failed to load parquet data: {e}") sys.exit(1) # Connect to Trino print(f"Connecting to Trino at {TRINO_HOST}:{TRINO_PORT}...") # Try to connect with retries max_attempts = 10 for attempt in range(1, max_attempts + 1): try: conn = trino.dbapi.connect( host=TRINO_HOST, port=TRINO_PORT, user=TRINO_USER, catalog=TRINO_CATALOG ) print("✅ Connected to Trino") break except Exception as e: print(f"Attempt {attempt}/{max_attempts} - Failed to connect: {e}") if attempt == max_attempts: print("❌ Could not connect to Trino after multiple attempts") sys.exit(1) time.sleep(2) # Create cursor cursor = conn.cursor() try: # Create a schema print("Creating bullshit schema...") cursor.execute("CREATE SCHEMA IF NOT EXISTS memory.bullshit") # Drop tables if they exist (memory connector doesn't support CREATE OR REPLACE) print("Dropping existing tables if they exist...") try: cursor.execute("DROP TABLE IF EXISTS memory.bullshit.bullshit_data") cursor.execute("DROP TABLE IF EXISTS memory.bullshit.real_bullshit_data") cursor.execute("DROP VIEW IF EXISTS memory.bullshit.bullshit_summary") except Exception as e: print(f"Warning during table drops: {e}") # Create a sample table for our bullshit data print("Creating sample bullshit_data table...") cursor.execute(""" CREATE TABLE memory.bullshit.bullshit_data ( id BIGINT, job_title VARCHAR, name VARCHAR, salary BIGINT, bullshit_factor INTEGER, boolean_flag BOOLEAN, enum_field VARCHAR ) """) # Insert sample data print("Inserting sample data...") cursor.execute(""" INSERT INTO memory.bullshit.bullshit_data VALUES (1, 'CEO', 'Sample Data', 250000, 10, TRUE, 'Option A'), (2, 'CTO', 'More Examples', 225000, 8, TRUE, 'Option B'), (3, 'Developer', 'Testing Data', 120000, 5, FALSE, 'Option C') """) # Now we'll load real data from our dataframe # For memory connector, we need to create a new table with the data print("Creating real_bullshit_data table with our generated data...") # Take a subset of columns for simplicity cols = ['id', 'name', 'job_title', 'salary', 'bullshit_factor', 'bullshit_statement', 'company'] df_subset = df[cols].head(100) # Take just 100 rows to keep it manageable # Handle NULL values - replace with empty strings for strings and 0 for numbers df_subset = df_subset.fillna({ 'name': 'Anonymous', 'job_title': 'Unknown', 'bullshit_statement': 'No statement', 'company': 'Unknown Co' }) df_subset = df_subset.fillna(0) # Create the table structure cursor.execute(""" CREATE TABLE memory.bullshit.real_bullshit_data ( id BIGINT, job_title VARCHAR, name VARCHAR, salary DOUBLE, bullshit_factor DOUBLE, bullshit_statement VARCHAR, company VARCHAR ) """) # Insert data in batches to avoid overly long SQL statements batch_size = 10 total_batches = (len(df_subset) + batch_size - 1) // batch_size # Ceiling division print(f"Inserting {len(df_subset)} rows in {total_batches} batches...") for batch_num in range(total_batches): start_idx = batch_num * batch_size end_idx = min(start_idx + batch_size, len(df_subset)) batch = df_subset.iloc[start_idx:end_idx] # Create VALUES part of SQL statement for this batch values_list = [] for _, row in batch.iterrows(): # Clean string values to prevent SQL injection/syntax errors job_title = str(row['job_title']).replace("'", "''") name = str(row['name']).replace("'", "''") statement = str(row['bullshit_statement']).replace("'", "''") company = str(row['company']).replace("'", "''") values_str = f"({row['id']}, '{job_title}', '{name}', {row['salary']}, {row['bullshit_factor']}, '{statement}', '{company}')" values_list.append(values_str) values_sql = ", ".join(values_list) # Insert batch insert_sql = f""" INSERT INTO memory.bullshit.real_bullshit_data VALUES {values_sql} """ cursor.execute(insert_sql) print(f"Batch {batch_num+1}/{total_batches} inserted.") # Create a summary view print("Creating summary view...") cursor.execute(""" CREATE VIEW memory.bullshit.bullshit_summary AS SELECT job_title, COUNT(*) as count, AVG(salary) as avg_salary, AVG(bullshit_factor) as avg_bs_factor FROM memory.bullshit.real_bullshit_data GROUP BY job_title """) # Query to verify print("\nVerifying data with a query:") cursor.execute("SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC") # Print results columns = [desc[0] for desc in cursor.description] print("\n" + " | ".join(columns)) print("-" * 80) rows = cursor.fetchall() for row in rows: print(" | ".join(str(cell) for cell in row)) print(f"\n✅ Successfully loaded {len(df_subset)} rows of bullshit data into Trino!") print("You can now query it with: SELECT * FROM memory.bullshit.real_bullshit_data") except Exception as e: print(f"❌ Error: {e}") finally: cursor.close() conn.close() print("Connection closed") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /scripts/test_stdio_trino.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Test script for the Trino MCP protocol using STDIO transport. This avoids the SSE transport issues we're encountering. """ import json import os import subprocess import sys import time from typing import Dict, Any, Optional, List def test_mcp_stdio(): """ Test the MCP protocol using a subprocess with STDIO transport. """ print("🚀 Testing Trino MCP with STDIO transport...") # Start the MCP server in a subprocess with STDIO transport print("Starting MCP server with STDIO transport...") mcp_server_cmd = [ "docker", "exec", "-it", "trino_mcp_trino-mcp_1", "python", "-m", "trino_mcp.server", "--transport", "stdio" ] process = subprocess.Popen( mcp_server_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 # Line buffered ) # Helper function to send a request and get a response def send_request(request: Dict[str, Any]) -> Optional[Dict[str, Any]]: request_str = json.dumps(request) + "\n" print(f"\n📤 Sending: {request_str.strip()}") process.stdin.write(request_str) process.stdin.flush() # Read response with timeout start_time = time.time() timeout = 10 # 10 seconds timeout response_str = None while time.time() - start_time < timeout: response_str = process.stdout.readline().strip() if response_str: print(f"📩 Received: {response_str}") try: return json.loads(response_str) except json.JSONDecodeError as e: print(f"❌ Error parsing response as JSON: {e}") time.sleep(0.1) print("⏱️ Timeout waiting for response") return None # Read any startup output to clear the buffer print("Waiting for server startup...") time.sleep(2) # Give the server time to start up # Initialize the protocol print("\n=== Step 1: Initialize MCP ===") initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "trino-mcp-test-client", "version": "1.0.0" }, "capabilities": { "tools": True, "resources": { "supportedSources": ["trino://catalog"] } } } } initialize_response = send_request(initialize_request) if not initialize_response: print("❌ Failed to initialize MCP") process.terminate() return print("✅ MCP initialized successfully") print(f"Server info: {json.dumps(initialize_response.get('result', {}).get('serverInfo', {}), indent=2)}") # Send initialized notification print("\n=== Step 2: Send initialized notification ===") initialized_notification = { "jsonrpc": "2.0", "method": "initialized" } _ = send_request(initialized_notification) print("✅ Initialized notification sent") # Get available tools print("\n=== Step 3: List available tools ===") tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } tools_response = send_request(tools_request) if not tools_response or "result" not in tools_response: print("❌ Failed to get tools list") process.terminate() return tools = tools_response.get("result", {}).get("tools", []) print(f"✅ Available tools: {len(tools)}") for tool in tools: print(f" - {tool.get('name')}: {tool.get('description')}") # Execute a simple query print("\n=== Step 4: Execute a query ===") query_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT * FROM memory.bullshit.bullshit_data", "catalog": "memory" } } } query_response = send_request(query_request) if not query_response: print("❌ Failed to execute query") elif "error" in query_response: print(f"❌ Query error: {query_response.get('error')}") else: result = query_response.get("result", {}) row_count = result.get("row_count", 0) print(f"✅ Query executed successfully with {row_count} rows") # Print columns and preview rows print(f"Columns: {', '.join(result.get('columns', []))}") print("Preview rows:") for row in result.get("preview_rows", []): print(f" {row}") # Execute a summary query print("\n=== Step 5: Query the summary view ===") summary_request = { "jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC", "catalog": "memory" } } } summary_response = send_request(summary_request) if not summary_response: print("❌ Failed to execute summary query") elif "error" in summary_response: print(f"❌ Summary query error: {summary_response.get('error')}") else: result = summary_response.get("result", {}) row_count = result.get("row_count", 0) print(f"✅ Summary query executed successfully with {row_count} rows") # Print columns and preview rows print(f"Columns: {', '.join(result.get('columns', []))}") print("Summary data:") for row in result.get("preview_rows", []): print(f" {row}") # List available resources print("\n=== Step 6: List available resources ===") resources_request = { "jsonrpc": "2.0", "id": 5, "method": "resources/list", "params": { "source": "trino://catalog" } } resources_response = send_request(resources_request) if not resources_response or "result" not in resources_response: print("❌ Failed to get resources list") else: resources = resources_response.get("result", {}).get("resources", []) print(f"✅ Available resources: {len(resources)}") for resource in resources: print(f" - {resource}") # Clean up the process print("\n=== Finishing test ===") process.terminate() try: process.wait(timeout=5) print("✅ MCP server process terminated") except subprocess.TimeoutExpired: print("⚠️ Had to force kill the MCP server process") process.kill() # Check for errors in stderr stderr = process.stderr.read() if stderr: print("\n⚠️ Server stderr output:") print(stderr) print("\n🏁 Test completed!") if __name__ == "__main__": test_mcp_stdio() ``` -------------------------------------------------------------------------------- /scripts/test_fixed_client.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Fixed test script for the MCP server that uses the correct notification format. This should actually work with MCP 1.3.0. """ import json import requests import sys import time import sseclient from rich.console import Console console = Console() def test_mcp(): """ Test the MCP server with proper message formats. Fixes the notification format to work with MCP 1.3.0. """ console.print("[bold green]🚀 Starting MCP client test with fixed notification format[/]") # Connect to SSE endpoint console.print("[bold blue]Connecting to SSE endpoint...[/]") headers = {"Accept": "text/event-stream"} sse_response = requests.get("http://localhost:9096/sse", headers=headers, stream=True) client = sseclient.SSEClient(sse_response) # Get the messages URL from the first event messages_url = None session_id = None for event in client.events(): console.print(f"[cyan]SSE event:[/] {event.event}") console.print(f"[cyan]SSE data:[/] {event.data}") if event.event == "endpoint": messages_url = f"http://localhost:9096{event.data}" # Extract session ID from URL if "session_id=" in event.data: session_id = event.data.split("session_id=")[1] console.print(f"[green]Got messages URL:[/] {messages_url}") console.print(f"[green]Session ID:[/] {session_id}") break if not messages_url: console.print("[bold red]Failed to get messages URL from SSE[/]") return # Now we have the messages URL, send initialize request console.print(f"\n[bold blue]Sending initialize request to {messages_url}[/]") initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "fixed-test-client", "version": "1.0.0" }, "capabilities": { "tools": True, "resources": { "supportedSources": ["trino://catalog"] } } } } try: response = requests.post(messages_url, json=initialize_request) console.print(f"[cyan]Status code:[/] {response.status_code}") console.print(f"[cyan]Response:[/] {response.text}") # Continue listening for events to get the response console.print("\n[bold blue]Listening for response events...[/]") # Start a timeout counter start_time = time.time() timeout = 30 # 30 seconds timeout # Keep listening for events for event in client.events(): # Skip ping events if event.event == "ping": continue console.print(f"[magenta]Event type:[/] {event.event}") console.print(f"[magenta]Event data:[/] {event.data}") # If we get a message event, parse it if event.event == "message" and event.data: try: data = json.loads(event.data) console.print(f"[green]Parsed message:[/] {json.dumps(data, indent=2)}") # Check if this is a response to our initialize request if "id" in data and data["id"] == 1: # Send an initialization notification with CORRECT FORMAT console.print("\n[bold blue]Sending initialized notification with correct format...[/]") initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", # FIXED: correct method name "params": {} # FIXED: added required params } response = requests.post(messages_url, json=initialized_notification) console.print(f"[cyan]Status code:[/] {response.status_code}") console.print(f"[cyan]Response:[/] {response.text}") # Now send a tools/list request console.print("\n[bold blue]Sending tools/list request...[/]") tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } response = requests.post(messages_url, json=tools_request) console.print(f"[cyan]Status code:[/] {response.status_code}") console.print(f"[cyan]Response:[/] {response.text}") # Check if this is a response to our tools/list request if "id" in data and data["id"] == 2: # Now send a resources/list request for trino catalogs console.print("\n[bold blue]Sending resources/list request for trino catalogs...[/]") resources_request = { "jsonrpc": "2.0", "id": 3, "method": "resources/list", "params": { "source": "trino://catalog" } } response = requests.post(messages_url, json=resources_request) console.print(f"[cyan]Status code:[/] {response.status_code}") console.print(f"[cyan]Response:[/] {response.text}") # If we get the resource list, try to execute a query if "id" in data and data["id"] == 3: console.print("\n[bold green]🔥 Got resources! Now trying to execute a query...[/]") query_request = { "jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT 1 AS test_value, 'it works!' AS message", "catalog": "memory" } } } response = requests.post(messages_url, json=query_request) console.print(f"[cyan]Status code:[/] {response.status_code}") console.print(f"[cyan]Response:[/] {response.text}") except Exception as e: console.print(f"[bold red]Error parsing message:[/] {e}") # Check timeout if time.time() - start_time > timeout: console.print("[bold yellow]Timeout waiting for response[/]") break except KeyboardInterrupt: console.print("\n[bold yellow]Exiting...[/]") except Exception as e: console.print(f"[bold red]Error:[/] {e}") finally: # Close the SSE connection sse_response.close() console.print("[bold green]Test completed. Connection closed.[/]") if __name__ == "__main__": test_mcp() ``` -------------------------------------------------------------------------------- /tools/create_bullshit_data.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Create a bullshit parquet file full of random silly data for Trino to query. """ import os import pandas as pd import numpy as np from datetime import datetime, timedelta import random import string # Make this shit reproducible random.seed(42069) np.random.seed(42069) def random_company_name(): """Generate a ridiculous startup name.""" prefixes = ["Block", "Hash", "Crypto", "Data", "Quantum", "Neural", "Cloud", "Cyber", "Meta", "Digital", "AI", "ML", "Algo", "Bit", "Logic", "Hyper", "Ultra", "Deep", "Sync", "Tech"] suffixes = ["Chain", "Flow", "Mind", "Logic", "Base", "Scale", "Cube", "Stream", "Grid", "Verse", "Net", "Ware", "Hub", "Pulse", "Sense", "Node", "Edge", "Core", "Link", "Matrix"] return f"{random.choice(prefixes)}{random.choice(suffixes)}" def random_bullshit_job_title(): """Generate a bullshit job title.""" prefix = ["Chief", "Senior", "Lead", "Global", "Dynamic", "Principal", "Executive", "Head of", "Director of", "VP of", "Distinguished", "Advanced", "Master", "Innovation", "Transformation"] middle = ["Digital", "Data", "Blockchain", "AI", "Experience", "Product", "Solutions", "Technical", "Strategic", "Cloud", "Enterprise", "Creative", "Platform", "Innovation", "Disruption"] suffix = ["Officer", "Architect", "Evangelist", "Guru", "Ninja", "Rockstar", "Wizard", "Jedi", "Explorer", "Catalyst", "Visionary", "Storyteller", "Hacker", "Champion", "Designer"] return f"{random.choice(prefix)} {random.choice(middle)} {random.choice(suffix)}" def random_email(name): """Generate a random email based on a name.""" domains = ["gmail.com", "hotmail.com", "yahoo.com", "outlook.com", "icloud.com", "protonmail.com", "example.com", "bullshit.io", "fakeaf.dev", "notreal.net"] name_part = name.lower().replace(" ", "") return f"{name_part}{random.randint(1, 999)}@{random.choice(domains)}" def random_ip(): """Generate a random IP address.""" return f"{random.randint(1, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}" def random_name(): """Generate a random person name.""" first_names = ["James", "Mary", "John", "Patricia", "Robert", "Jennifer", "Michael", "Linda", "William", "Elizabeth", "David", "Susan", "Richard", "Jessica", "Joseph", "Sarah", "Thomas", "Karen", "Charles", "Nancy", "Skyler", "Jesse", "Walter", "Saul", "Mike"] last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson", "White", "Goodman", "Pinkman", "Fring", "Ehrmantraut", "Schrader", "Wexler"] return f"{random.choice(first_names)} {random.choice(last_names)}" def random_sentence(): """Generate a random bullshit sentence.""" subjects = ["Our company", "The team", "This product", "The algorithm", "Our platform", "The API", "Our solution", "The dashboard", "Our methodology", "The framework", "This breakthrough"] verbs = ["leverages", "utilizes", "implements", "optimizes", "integrates", "streamlines", "facilitates", "enables", "empowers", "revolutionizes", "disrupts", "transforms", "synergizes with"] adjectives = ["cutting-edge", "next-generation", "state-of-the-art", "innovative", "advanced", "robust", "scalable", "agile", "dynamic", "intuitive", "seamless", "bleeding-edge"] nouns = ["blockchain", "AI", "machine learning", "cloud computing", "big data", "IoT", "microservices", "neural networks", "quantum computing", "edge computing", "digital transformation", "DevOps"] benefits = ["increasing efficiency", "maximizing ROI", "driving growth", "boosting productivity", "enhancing performance", "reducing overhead", "optimizing workflows", "minimizing downtime", "accelerating innovation", "enabling scalability", "facilitating collaboration"] return f"{random.choice(subjects)} {random.choice(verbs)} {random.choice(adjectives)} {random.choice(nouns)} for {random.choice(benefits)}." def generate_bullshit_data(num_rows=1000): """Generate a DataFrame of complete bullshit data.""" print(f"Generating {num_rows} rows of absolute bullshit...") # Generate random data data = { "id": list(range(1, num_rows + 1)), "name": [random_name() for _ in range(num_rows)], "email": [], # Will fill after generating names "company": [random_company_name() for _ in range(num_rows)], "job_title": [random_bullshit_job_title() for _ in range(num_rows)], "salary": np.random.normal(150000, 50000, num_rows).astype(int), # Ridiculously high tech salaries "bullshit_factor": np.random.randint(1, 11, num_rows), # On a scale of 1-10 "ip_address": [random_ip() for _ in range(num_rows)], "created_at": [(datetime.now() - timedelta(days=random.randint(0, 365 * 3))).strftime('%Y-%m-%d %H:%M:%S') for _ in range(num_rows)], "last_active": [(datetime.now() - timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d %H:%M:%S') for _ in range(num_rows)], "account_status": np.random.choice(['active', 'inactive', 'suspended', 'pending'], num_rows, p=[0.7, 0.1, 0.1, 0.1]), "login_count": np.random.randint(1, 1000, num_rows), "buzzword_quota": np.random.randint(5, 100, num_rows), "bullshit_statement": [random_sentence() for _ in range(num_rows)], "favorite_framework": np.random.choice(['React', 'Angular', 'Vue', 'Svelte', 'Django', 'Flask', 'Spring', 'Rails'], num_rows), "preferred_language": np.random.choice(['Python', 'JavaScript', 'Java', 'C#', 'Go', 'Rust', 'TypeScript', 'Ruby'], num_rows), "coffee_consumption": np.random.randint(1, 10, num_rows), # Cups per day "meeting_hours": np.random.randint(0, 40, num_rows), # Hours per week "actual_work_hours": np.random.randint(0, 40, num_rows), # Hours per week "bugs_created": np.random.randint(0, 100, num_rows), "bugs_fixed": [], # Will calculate after bugs_created "productivity_score": np.random.rand(num_rows) * 100, "gitlab_commits": np.random.negative_binomial(5, 0.5, num_rows), # Most people commit very little "stackoverflow_reputation": np.random.exponential(1000, num_rows).astype(int), "random_float": np.random.rand(num_rows) * 100, "boolean_flag": np.random.choice([True, False], num_rows), "enum_field": np.random.choice(['Option A', 'Option B', 'Option C', 'Option D'], num_rows), "null_percentage": np.random.rand(num_rows) * 100, } # Generate dependent fields for i in range(num_rows): # Email based on name data["email"].append(random_email(data["name"][i])) # Bugs fixed is some percentage of bugs created fix_rate = random.uniform(0.5, 1.2) # Sometimes they fix more bugs than they create! data["bugs_fixed"].append(int(data["bugs_created"][i] * fix_rate)) # Create DataFrame df = pd.DataFrame(data) # Add some NULL values for realism for col in df.columns: if col != 'id': # Keep id intact null_mask = np.random.random(num_rows) < 0.05 # 5% chance of NULL df.loc[null_mask, col] = None return df def main(): """Main function to create and save the bullshit data.""" # Create data directory if it doesn't exist data_dir = "data" os.makedirs(data_dir, exist_ok=True) # Generate bullshit data df = generate_bullshit_data(num_rows=10000) # 10,000 rows of pure nonsense # Save as parquet parquet_path = os.path.join(data_dir, "bullshit_data.parquet") df.to_parquet(parquet_path, index=False) print(f"Saved bullshit data to {parquet_path}") # Print some sample data print("\nSample of the bullshit data:") print(df.head()) # Print column info print("\nColumn data types:") print(df.dtypes) # Print basic stats print("\nBasic statistics:") print(df.describe()) # Also save as CSV for easy inspection csv_path = os.path.join(data_dir, "bullshit_data.csv") df.to_csv(csv_path, index=False) print(f"Also saved as CSV to {csv_path} for easy inspection") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /test_llm_api.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Test script for the LLM API endpoint in the Trino MCP server. This script tests the various endpoints of the API server to verify functionality. """ import json import requests from rich.console import Console from rich.table import Table from typing import Dict, Any, List, Optional # Configuration API_HOST = "localhost" API_PORT = 9097 API_BASE_URL = f"http://{API_HOST}:{API_PORT}" console = Console() def test_endpoint(url: str, method: str = "GET", data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Test an endpoint and return the response with detailed info.""" console.print(f"\n[bold blue]Testing {method} {url}[/bold blue]") try: if method.upper() == "GET": response = requests.get(url, timeout=5) elif method.upper() == "POST": response = requests.post(url, json=data, timeout=5) else: console.print(f"[bold red]Unsupported method: {method}[/bold red]") return {"success": False, "status_code": 0, "error": f"Unsupported method: {method}"} status_color = "green" if response.status_code < 400 else "red" console.print(f"Status: [bold {status_color}]{response.status_code} - {response.reason}[/bold {status_color}]") # Try to parse response as JSON try: data = response.json() console.print("Response data:") console.print(json.dumps(data, indent=2)) return { "success": response.status_code < 400, "status_code": response.status_code, "data": data } except ValueError: console.print(f"Response text: {response.text[:500]}") return { "success": response.status_code < 400, "status_code": response.status_code, "text": response.text[:500] } except Exception as e: console.print(f"[bold red]Error: {str(e)}[/bold red]") return {"success": False, "error": str(e)} def discover_all_endpoints() -> None: """Discover available endpoints by trying common paths.""" console.print("[bold yellow]Discovering endpoints...[/bold yellow]") # Common endpoints to check endpoints = [ "/", "/api", "/docs", "/redoc", "/openapi.json", "/health", "/api/query", "/query" ] results = [] for endpoint in endpoints: url = f"{API_BASE_URL}{endpoint}" result = test_endpoint(url) results.append({ "endpoint": endpoint, "status": result["status_code"] if "status_code" in result else "Error", "success": result.get("success", False) }) # Display results in a table table = Table(title="API Endpoint Discovery Results") table.add_column("Endpoint", style="cyan") table.add_column("Status", style="magenta") table.add_column("Result", style="green") for result in results: status = str(result["status"]) status_style = "green" if result["success"] else "red" result_text = "✅ Available" if result["success"] else "❌ Not Available" table.add_row(result["endpoint"], f"[{status_style}]{status}[/{status_style}]", result_text) console.print(table) def test_api_docs() -> bool: """Test the API documentation endpoint.""" console.print("\n[bold yellow]Testing API documentation...[/bold yellow]") # Try the /docs endpoint first url = f"{API_BASE_URL}/docs" result = test_endpoint(url) if result.get("success", False): console.print("[bold green]API documentation is available at /docs[/bold green]") return True else: console.print("[bold red]API documentation not available at /docs[/bold red]") # Try the OpenAPI JSON endpoint url = f"{API_BASE_URL}/openapi.json" result = test_endpoint(url) if result.get("success", False): console.print("[bold green]OpenAPI spec is available at /openapi.json[/bold green]") # Try to extract query endpoint from the spec if "data" in result: try: paths = result["data"].get("paths", {}) for path, methods in paths.items(): if "post" in methods and ("/query" in path or "/api/query" in path): console.print(f"[bold green]Found query endpoint in OpenAPI spec: {path}[/bold green]") return True except: console.print("[bold red]Failed to parse OpenAPI spec[/bold red]") return True else: console.print("[bold red]OpenAPI spec not available[/bold red]") return False def test_valid_query() -> bool: """Test a valid SQL query against the API.""" console.print("\n[bold yellow]Testing valid SQL query...[/bold yellow]") # Try multiple potential query endpoints query_payload = { "query": "SELECT 1 AS test", "catalog": "memory", "schema": "default" } endpoints = ["/api/query", "/query"] for endpoint in endpoints: url = f"{API_BASE_URL}{endpoint}" console.print(f"[bold]Trying endpoint: {endpoint}[/bold]") result = test_endpoint(url, method="POST", data=query_payload) if result.get("success", False): console.print(f"[bold green]Successfully executed query at {endpoint}[/bold green]") # Display results if available if "data" in result and "results" in result["data"]: display_query_results(result["data"]["results"]) return True console.print("[bold red]Failed to execute query on any endpoint[/bold red]") return False def test_invalid_query() -> bool: """Test an invalid SQL query to check error handling.""" console.print("\n[bold yellow]Testing invalid SQL query (error handling)...[/bold yellow]") query_payload = { "query": "SELECT * FROM nonexistent_table", "catalog": "memory", "schema": "default" } # Try the same endpoints as for valid query endpoints = ["/api/query", "/query"] for endpoint in endpoints: url = f"{API_BASE_URL}{endpoint}" console.print(f"[bold]Trying endpoint: {endpoint}[/bold]") result = test_endpoint(url, method="POST", data=query_payload) # Check if we got a proper error response (should be 400 Bad Request) if "status_code" in result and result["status_code"] == 400: console.print(f"[bold green]API correctly rejected invalid query at {endpoint} with 400 status[/bold green]") return True console.print("[bold red]Failed to properly handle invalid query on any endpoint[/bold red]") return False def display_query_results(results: Dict[str, Any]) -> None: """Display query results in a formatted table.""" if not results or "rows" not in results or not results["rows"]: console.print("[italic yellow]No results returned[/italic yellow]") return table = Table(title="Query Results") # Add columns to the table for column in results.get("columns", []): table.add_column(column) # Add data rows for row in results.get("rows", []): if isinstance(row, dict): table.add_row(*[str(row.get(col, "")) for col in results.get("columns", [])]) elif isinstance(row, list): table.add_row(*[str(val) for val in row]) console.print(table) console.print(f"[italic]Total rows: {results.get('row_count', len(results.get('rows', [])))}[/italic]") if "execution_time_ms" in results: console.print(f"[italic]Execution time: {results['execution_time_ms']} ms[/italic]") def main() -> None: """Run all tests.""" console.print("[bold green]=== Trino MCP LLM API Test ===\n[/bold green]") # First discover all available endpoints discover_all_endpoints() # Test API documentation docs_available = test_api_docs() # Only test queries if docs are available if docs_available: test_valid_query() test_invalid_query() else: console.print("[bold red]Skipping query tests as API documentation is not available[/bold red]") console.print("\n[bold green]=== Test completed ===\n[/bold green]") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/trino_mcp/trino_client.py: -------------------------------------------------------------------------------- ```python """ Trino client wrapper for interacting with Trino. """ from __future__ import annotations import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple, Union import trino from loguru import logger from trino_mcp.config import TrinoConfig @dataclass class TrinoQueryResult: """A class to represent the result of a Trino query.""" query_id: str columns: List[str] rows: List[List[Any]] query_time_ms: float row_count: int class TrinoClient: """ A wrapper around the trino-python client to interact with Trino. """ def __init__(self, config: TrinoConfig): """ Initialize the Trino client. Args: config: Trino connection configuration. """ self.config = config self.conn = None self.current_catalog = config.catalog self.current_schema = config.schema def connect(self) -> None: """ Connect to the Trino server. This will connect to Trino with the catalog parameter if provided. """ logger.info(f"Connecting to Trino at {self.config.host}:{self.config.port}") # Create connection params including catalog from config conn_params = self.config.connection_params # Connect to Trino with proper parameters self.conn = trino.dbapi.connect(**conn_params) def disconnect(self) -> None: """ Disconnect from the Trino server. """ if self.conn: logger.info("Disconnecting from Trino") self.conn.close() self.conn = None def ensure_connection(self) -> None: """ Ensure that the client is connected to Trino. """ if not self.conn: self.connect() def execute_query( self, sql: str, catalog: Optional[str] = None, schema: Optional[str] = None ) -> TrinoQueryResult: """ Execute a SQL query against Trino. Important note on catalog handling: This method properly sets the catalog by updating the connection parameters, rather than using unreliable "USE catalog" statements. The catalog is passed directly to the connection, which is more reliable than SQL-based catalog switching. Args: sql: The SQL query to execute. catalog: Optional catalog name to use for the query. schema: Optional schema name to use for the query. Returns: TrinoQueryResult: The result of the query. """ # If we're switching catalogs or don't have a connection, we need to reconnect use_catalog = catalog or self.current_catalog if self.conn and (use_catalog != self.current_catalog): logger.info(f"Switching catalog from {self.current_catalog} to {use_catalog}, reconnecting...") self.disconnect() # Update current catalog and schema self.current_catalog = use_catalog if schema: self.current_schema = schema # Update the config catalog before connecting if use_catalog: self.config.catalog = use_catalog # Ensure connection with updated catalog self.ensure_connection() # Create a cursor cursor = self.conn.cursor() # If we have a schema, try to set it # This still uses a USE statement, but catalogs are now set in the connection if self.current_schema: try: logger.debug(f"Setting schema to {self.current_schema}") # Make sure to include catalog with schema to avoid errors if self.current_catalog: cursor.execute(f"USE {self.current_catalog}.{self.current_schema}") else: logger.warning("Cannot set schema without catalog") except Exception as e: logger.warning(f"Failed to set schema: {e}") try: # Execute the query and time it logger.debug(f"Executing query: {sql}") start_time = time.time() cursor.execute(sql) query_time = time.time() - start_time # Fetch the query ID, metadata and results query_id = cursor.stats.get("queryId", "unknown") columns = [desc[0] for desc in cursor.description] if cursor.description else [] rows = cursor.fetchall() if cursor.description else [] return TrinoQueryResult( query_id=query_id, columns=columns, rows=rows, query_time_ms=query_time * 1000, row_count=len(rows) ) except Exception as e: logger.error(f"Query execution failed: {e}") raise def get_catalogs(self) -> List[Dict[str, str]]: """ Get a list of all catalogs in Trino. Returns: List[Dict[str, str]]: A list of catalog metadata. """ result = self.execute_query("SHOW CATALOGS") return [{"name": row[0]} for row in result.rows] def get_schemas(self, catalog: str) -> List[Dict[str, str]]: """ Get a list of all schemas in a catalog. Args: catalog: The catalog name. Returns: List[Dict[str, str]]: A list of schema metadata. """ result = self.execute_query(f"SHOW SCHEMAS FROM {catalog}", catalog=catalog) return [{"name": row[0], "catalog": catalog} for row in result.rows] def get_tables(self, catalog: str, schema: str) -> List[Dict[str, str]]: """ Get a list of all tables in a schema. Args: catalog: The catalog name. schema: The schema name. Returns: List[Dict[str, str]]: A list of table metadata. """ result = self.execute_query(f"SHOW TABLES FROM {catalog}.{schema}", catalog=catalog, schema=schema) return [{"name": row[0], "catalog": catalog, "schema": schema} for row in result.rows] def get_columns(self, catalog: str, schema: str, table: str) -> List[Dict[str, Any]]: """ Get a list of all columns in a table. Args: catalog: The catalog name. schema: The schema name. table: The table name. Returns: List[Dict[str, Any]]: A list of column metadata. """ result = self.execute_query( f"DESCRIBE {catalog}.{schema}.{table}", catalog=catalog, schema=schema ) columns = [] for row in result.rows: columns.append({ "name": row[0], "type": row[1], "extra": row[2] if len(row) > 2 else None, "catalog": catalog, "schema": schema, "table": table }) return columns def get_table_details(self, catalog: str, schema: str, table: str) -> Dict[str, Any]: """ Get detailed information about a table including columns and statistics. Args: catalog: The catalog name. schema: The schema name. table: The table name. Returns: Dict[str, Any]: Detailed table information. """ columns = self.get_columns(catalog, schema, table) # Get table statistics if available (might not be supported by all connectors) try: stats_query = f""" SELECT * FROM {catalog}.information_schema.tables WHERE table_catalog = '{catalog}' AND table_schema = '{schema}' AND table_name = '{table}' """ stats_result = self.execute_query(stats_query, catalog=catalog) stats = {} if stats_result.rows: row = stats_result.rows[0] for i, col in enumerate(stats_result.columns): stats[col.lower()] = row[i] except Exception as e: logger.warning(f"Failed to get table statistics: {e}") stats = {} return { "name": table, "catalog": catalog, "schema": schema, "columns": columns, "statistics": stats } def cancel_query(self, query_id: str) -> bool: """ Cancel a running query. Args: query_id: The ID of the query to cancel. Returns: bool: True if the query was successfully canceled, False otherwise. """ self.ensure_connection() try: # Use system procedures to cancel the query self.execute_query(f"CALL system.runtime.kill_query(query_id => '{query_id}')") return True except Exception as e: logger.error(f"Failed to cancel query {query_id}: {e}") return False ``` -------------------------------------------------------------------------------- /test_bullshit_query.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Test script to query our bullshit data through the MCP server. This demonstrates that our fix for the catalog handling works by running a complex query. """ import json import subprocess import sys import time def test_bullshit_query(): """Run a query against our bullshit data using the MCP STDIO transport.""" print("🚀 Testing Bullshit Data with MCP STDIO Transport") # Start the MCP server with STDIO transport cmd = [ "docker", "exec", "-i", "trino_mcp_trino-mcp_1", "python", "-m", "trino_mcp.server", "--transport", "stdio", "--debug", "--trino-host", "trino", "--trino-port", "8080", "--trino-user", "trino", "--trino-catalog", "memory" ] try: print("Starting MCP server process with STDIO transport...") process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 # Line buffered ) # Sleep to let the server initialize time.sleep(2) # Helper function to send a request and get a response def send_request(request, expect_response=True): """ Send a request to the MCP server and get the response. Args: request: The JSON-RPC request to send expect_response: Whether to wait for a response Returns: The JSON-RPC response, or None if no response is expected """ request_str = json.dumps(request) + "\n" print(f"\n📤 Sending: {request_str.strip()}") try: process.stdin.write(request_str) process.stdin.flush() except BrokenPipeError: print("❌ Broken pipe - server has closed the connection") return None if not expect_response: print("✅ Sent notification (no response expected)") return None # Read the response print("Waiting for response...") try: response_str = process.stdout.readline() if response_str: print(f"📩 Received response") return json.loads(response_str) else: print("❌ No response received") return None except Exception as e: print(f"❌ Error reading response: {e}") return None # ===== STEP 1: Initialize MCP ===== print("\n===== STEP 1: Initialize MCP =====") initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "bullshit-data-query-test", "version": "1.0.0" }, "capabilities": { "tools": True, "resources": { "supportedSources": ["trino://catalog"] } } } } init_response = send_request(initialize_request) if not init_response: print("❌ Failed to initialize MCP - exiting test") return # Print server info if "result" in init_response and "serverInfo" in init_response["result"]: server_info = init_response["result"]["serverInfo"] print(f"✅ Connected to server: {server_info.get('name')} {server_info.get('version')}") # ===== STEP 2: Send initialized notification ===== print("\n===== STEP 2: Send initialized notification =====") initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} } send_request(initialized_notification, expect_response=False) # ===== STEP 3: Query the bullshit data ===== print("\n===== STEP 3: Query the Bullshit Data =====") query_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": """ SELECT job_title, COUNT(*) as count, AVG(salary) as avg_salary, MAX(salary) as max_salary, AVG(bullshit_factor) as avg_bs_factor FROM memory.bullshit.real_bullshit_data WHERE salary > 150000 GROUP BY job_title HAVING AVG(bullshit_factor) > 5 ORDER BY avg_salary DESC LIMIT 10 """, "catalog": "memory", "schema": "bullshit" } } } query_response = send_request(query_request) if not query_response or "error" in query_response: if "error" in query_response: print(f"❌ Query error: {json.dumps(query_response.get('error', {}), indent=2)}") else: print("❌ Failed to execute query") else: print(f"✅ Bullshit query executed successfully!") # Parse the nested JSON in the content field try: content_text = query_response.get("result", {}).get("content", [{}])[0].get("text", "{}") result_data = json.loads(content_text) # Now we have the actual query result columns = result_data.get("columns", []) row_count = result_data.get("row_count", 0) preview_rows = result_data.get("preview_rows", []) print(f"\nColumns: {', '.join(columns)}") print(f"Row count: {row_count}") print("\n🏆 TOP 10 BULLSHIT JOBS (high salary, high BS factor):") print("-" * 100) # Print header with nice formatting header = " | ".join(f"{col.upper():20}" for col in columns) print(header) print("-" * 100) # Print rows with nice formatting for row in preview_rows: row_str = [] for col in columns: value = row.get(col, "N/A") if isinstance(value, float): row_str.append(f"{value:20.2f}") else: row_str.append(f"{str(value):20}") print(" | ".join(row_str)) except json.JSONDecodeError: print(f"Error parsing result content: {query_response}") except Exception as e: print(f"Error processing result: {e}") print(f"Raw result: {json.dumps(query_response.get('result', {}), indent=2)}") # ===== STEP 4: List Available Schemas ===== print("\n===== STEP 4: List Available Schemas in Memory Catalog =====") schema_query = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SHOW SCHEMAS FROM memory", "catalog": "memory" } } } schema_response = send_request(schema_query) if not schema_response or "error" in schema_response: if "error" in schema_response: print(f"❌ Schema query error: {json.dumps(schema_response.get('error', {}), indent=2)}") else: print("❌ Failed to execute schema query") else: print(f"✅ Schema query executed successfully!") # Parse the nested JSON in the content field try: content_text = schema_response.get("result", {}).get("content", [{}])[0].get("text", "{}") result_data = json.loads(content_text) # Extract schema names preview_rows = result_data.get("preview_rows", []) schema_column = result_data.get("columns", ["Schema"])[0] print("\n🗂️ Available schemas in memory catalog:") for row in preview_rows: schema_name = row.get(schema_column, "unknown") print(f" - {schema_name}") except json.JSONDecodeError: print(f"Error parsing schemas content: {schema_response}") except Exception as e: print(f"Error processing schemas: {e}") print(f"Raw schema result: {json.dumps(schema_response.get('result', {}), indent=2)}") print("\n🎉 Test completed successfully!") except Exception as e: print(f"❌ Error: {e}") finally: # Make sure to terminate the process if 'process' in locals() and process.poll() is None: print("\nTerminating server process...") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: print("Process didn't terminate, killing it...") process.kill() if __name__ == "__main__": test_bullshit_query() ``` -------------------------------------------------------------------------------- /scripts/docker_stdio_test.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Minimalist MCP STDIO test to run inside the container. This script avoids importing any external modules and uses just the Python standard library. """ import json import subprocess import time import sys import threading def test_mcp_stdio(): """Run a test of the MCP server using STDIO transport inside the container.""" print("🚀 Testing MCP with STDIO transport (container version)") # Start the MCP server with STDIO transport # We're directly using the module since we're in the container # Explicitly set the Trino host to trino:8080 server_cmd = [ "python", "-m", "trino_mcp.server", "--transport", "stdio", "--debug", "--trino-host", "trino", "--trino-port", "8080", "--trino-catalog", "memory" ] try: # Start the server in a subprocess process = subprocess.Popen( server_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 # Line buffered ) # Set up a thread to monitor stderr and print it def print_stderr(): while True: line = process.stderr.readline() if not line: break print(f"🔴 SERVER ERROR: {line.strip()}") stderr_thread = threading.Thread(target=print_stderr, daemon=True) stderr_thread.start() print("Starting server process...") # Sleep to allow server to initialize time.sleep(2) # Helper function to send a request and get a response def send_request(request_data, request_desc="", expect_response=True): request_json = json.dumps(request_data) + "\n" print(f"\n📤 Sending {request_desc}: {request_json.strip()}") try: process.stdin.write(request_json) process.stdin.flush() except BrokenPipeError: print(f"❌ Broken pipe when sending {request_desc}") return None # If we don't expect a response (notification), just return if not expect_response: print(f"✅ Sent {request_desc} (no response expected)") return True # Read response with timeout print(f"Waiting for {request_desc} response...") start_time = time.time() timeout = 10 while time.time() - start_time < timeout: # Check if process is still running if process.poll() is not None: print(f"Server process exited with code {process.returncode}") return None # Try to read a line from stdout response_line = process.stdout.readline().strip() if response_line: print(f"📩 Received response: {response_line}") try: return json.loads(response_line) except json.JSONDecodeError as e: print(f"❌ Error parsing response: {e}") # Wait a bit before trying again time.sleep(0.1) print(f"⏱️ Timeout waiting for {request_desc} response") return None # STEP 1: Initialize the server print("\n=== STEP 1: Initialize Server ===") initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "container-stdio-test", "version": "1.0.0" }, "capabilities": { "tools": True, "resources": { "supportedSources": ["trino://catalog"] } } } } init_response = send_request(initialize_request, "initialize request") if not init_response: raise Exception("Failed to initialize MCP server") server_info = init_response.get("result", {}).get("serverInfo", {}) print(f"✅ Connected to server: {server_info.get('name')} {server_info.get('version')}") # STEP 2: Send initialized notification (no response expected) print("\n=== STEP 2: Send Initialized Notification ===") initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} # Empty params object is required } _ = send_request(initialized_notification, "initialized notification", expect_response=False) # STEP 3: List available tools print("\n=== STEP 3: List Available Tools ===") tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } tools_response = send_request(tools_request, "tools list request") if not tools_response: print("❌ Failed to list tools") else: tools = tools_response.get("result", {}).get("tools", []) print(f"✅ Available tools: {len(tools)}") for tool in tools: print(f" - {tool.get('name')}: {tool.get('description', 'No description')}") # STEP 4: Execute a simple query if tools_response: print("\n=== STEP 4: Execute Simple Query ===") query_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT 'Hello, world!' AS greeting", "catalog": "memory" } } } query_response = send_request(query_request, "query execution") if not query_response: print("❌ Failed to execute query") elif "error" in query_response: print(f"❌ Query error: {json.dumps(query_response.get('error', {}), indent=2)}") else: result = query_response.get("result", {}) print(f"✅ Query executed successfully:") print(f" Columns: {', '.join(result.get('columns', []))}") print(f" Row count: {result.get('row_count', 0)}") print(f" Preview rows: {json.dumps(result.get('preview_rows', []), indent=2)}") # STEP 5: Try to query a bullshit table print("\n=== STEP 5: Query Bullshit Table ===") bs_query_request = { "jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT * FROM memory.bullshit.bullshit_data LIMIT 3", "catalog": "memory" } } } bs_query_response = send_request(bs_query_request, "bullshit table query") if not bs_query_response: print("❌ Failed to execute bullshit table query") elif "error" in bs_query_response: print(f"❌ Query error: {json.dumps(bs_query_response.get('error', {}), indent=2)}") else: result = bs_query_response.get("result", {}) print(f"✅ Bullshit query executed successfully:") print(f" Columns: {', '.join(result.get('columns', []))}") print(f" Row count: {result.get('row_count', 0)}") print(f" Preview rows: {json.dumps(result.get('preview_rows', []), indent=2)}") # STEP 6: Try resources listing print("\n=== STEP 6: List Resources ===") resources_request = { "jsonrpc": "2.0", "id": 5, "method": "resources/list", "params": { "source": "trino://catalog" } } resources_response = send_request(resources_request, "resources list request") if not resources_response: print("❌ Failed to list resources") elif "error" in resources_response: print(f"❌ Resources error: {json.dumps(resources_response.get('error', {}), indent=2)}") else: resources = resources_response.get("result", {}).get("items", []) print(f"✅ Available resources: {len(resources)}") for resource in resources: print(f" - {resource.get('source')}: {resource.get('path')}") # STEP 7: Shutdown print("\n=== STEP 7: Shutdown ===") shutdown_request = { "jsonrpc": "2.0", "id": 6, "method": "shutdown" } shutdown_response = send_request(shutdown_request, "shutdown request") print("✅ Server shutdown request sent") # Send exit notification (no response expected) exit_notification = { "jsonrpc": "2.0", "method": "exit", "params": {} # Empty params may be needed } _ = send_request(exit_notification, "exit notification", expect_response=False) except Exception as e: print(f"❌ Error: {e}") finally: # Make sure to terminate the process if 'process' in locals() and process.poll() is None: print("Terminating server process...") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: print("Process didn't terminate, killing it...") process.kill() print("\n🏁 Test completed!") if __name__ == "__main__": test_mcp_stdio() ``` -------------------------------------------------------------------------------- /test_mcp_stdio.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ STDIO transport test script for Trino MCP. This script demonstrates the end-to-end flow of initializing MCP, listing tools, querying data, and shutting down using the STDIO transport. """ import json import subprocess import sys import time def test_mcp_stdio(): """Run an end-to-end test of Trino MCP using STDIO transport.""" print("🚀 Starting Trino MCP STDIO test") # Start the MCP server with STDIO transport server_cmd = [ "docker", "exec", "-i", "trino_mcp_trino-mcp_1", "python", "-m", "trino_mcp.server", "--transport", "stdio", "--debug", "--trino-host", "trino", "--trino-port", "8080", "--trino-user", "trino", "--trino-catalog", "memory" ] try: print(f"Starting MCP server process: {' '.join(server_cmd)}") process = subprocess.Popen( server_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr, # Pass stderr through to see logs directly text=True, bufsize=1 # Line buffered ) # Sleep a bit to let the server initialize time.sleep(2) # Helper function to send a request and get a response def send_request(request, expect_response=True): """ Send a request to the MCP server and get the response. Args: request: The JSON-RPC request to send expect_response: Whether to wait for a response Returns: The JSON-RPC response, or None if no response is expected """ request_str = json.dumps(request) + "\n" print(f"\n📤 Sending: {request_str.strip()}") try: process.stdin.write(request_str) process.stdin.flush() except BrokenPipeError: print("❌ Broken pipe - server has closed the connection") return None if not expect_response: print("✅ Sent notification (no response expected)") return None # Read the response print("Waiting for response...") try: response_str = process.stdout.readline() if response_str: print(f"📩 Received: {response_str.strip()}") return json.loads(response_str) else: print("❌ No response received") return None except Exception as e: print(f"❌ Error reading response: {e}") return None # ===== STEP 1: Initialize MCP ===== print("\n===== STEP 1: Initialize MCP =====") initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "trino-mcp-stdio-test", "version": "1.0.0" }, "capabilities": { "tools": True, "resources": { "supportedSources": ["trino://catalog"] } } } } init_response = send_request(initialize_request) if not init_response: print("❌ Failed to initialize MCP - exiting test") return # Print server info if "result" in init_response and "serverInfo" in init_response["result"]: server_info = init_response["result"]["serverInfo"] print(f"✅ Connected to server: {server_info.get('name')} {server_info.get('version')}") # ===== STEP 2: Send initialized notification ===== print("\n===== STEP 2: Send initialized notification =====") initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {} } send_request(initialized_notification, expect_response=False) # ===== STEP 3: List available tools ===== print("\n===== STEP 3: List available tools =====") tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } tools_response = send_request(tools_request) if not tools_response or "result" not in tools_response: print("❌ Failed to get tools list") else: tools = tools_response.get("result", {}).get("tools", []) print(f"✅ Available tools: {len(tools)}") for tool in tools: print(f" - {tool.get('name')}: {tool.get('description', 'No description')[:80]}...") # ===== STEP 4: Execute a simple query ===== print("\n===== STEP 4: Execute a simple query =====") query_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT 'Hello from Trino MCP' AS message", "catalog": "memory" } } } query_response = send_request(query_request) if not query_response: print("❌ Failed to execute query") elif "error" in query_response: print(f"❌ Query error: {json.dumps(query_response.get('error', {}), indent=2)}") else: print(f"✅ Query executed successfully:") if "result" in query_response: result = query_response["result"] if isinstance(result, dict) and "content" in result: # Parse the content text which contains the actual results as a JSON string try: content = result["content"][0]["text"] result_data = json.loads(content) print(f" Query ID: {result_data.get('query_id', 'unknown')}") print(f" Columns: {', '.join(result_data.get('columns', []))}") print(f" Row count: {result_data.get('row_count', 0)}") print(f" Results: {json.dumps(result_data.get('preview_rows', []), indent=2)}") except (json.JSONDecodeError, IndexError) as e: print(f" Raw result: {json.dumps(result, indent=2)}") else: print(f" Raw result: {json.dumps(result, indent=2)}") else: print(f" Raw response: {json.dumps(query_response, indent=2)}") # Try the bullshit table query - this is what the original script wanted print("\n===== STEP 5: Query the Bullshit Table =====") bs_query_request = { "jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT * FROM memory.bullshit.bullshit_data LIMIT 3", "catalog": "memory" } } } bs_query_response = send_request(bs_query_request) if not bs_query_response: print("❌ Failed to execute bullshit table query") elif "error" in bs_query_response: err = bs_query_response.get("error", {}) if isinstance(err, dict): print(f"❌ Query error: {json.dumps(err, indent=2)}") else: print(f"❌ Query error: {err}") # Try with information_schema as fallback print("\n----- Fallback Query: Checking Available Schemas -----") fallback_query = { "jsonrpc": "2.0", "id": 5, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SHOW SCHEMAS FROM memory", "catalog": "memory" } } } schemas_response = send_request(fallback_query) if schemas_response and "result" in schemas_response: result = schemas_response["result"] if isinstance(result, dict) and "content" in result: try: content = result["content"][0]["text"] result_data = json.loads(content) print(f" Available schemas: {json.dumps(result_data.get('preview_rows', []), indent=2)}") except (json.JSONDecodeError, IndexError) as e: print(f" Raw schemas result: {json.dumps(result, indent=2)}") else: print(f"✅ Bullshit query executed successfully:") if "result" in bs_query_response: result = bs_query_response["result"] if isinstance(result, dict) and "content" in result: try: content = result["content"][0]["text"] result_data = json.loads(content) print(f" Query ID: {result_data.get('query_id', 'unknown')}") print(f" Columns: {', '.join(result_data.get('columns', []))}") print(f" Row count: {result_data.get('row_count', 0)}") print(f" Results: {json.dumps(result_data.get('preview_rows', []), indent=2)}") except (json.JSONDecodeError, IndexError) as e: print(f" Raw result: {json.dumps(result, indent=2)}") else: print(f" Raw result: {json.dumps(result, indent=2)}") else: print(f" Raw response: {json.dumps(bs_query_response, indent=2)}") # Skip the shutdown steps since those cause MCP errors print("\n🎉 Test successful - skipping shutdown to avoid MCP errors") except Exception as e: print(f"❌ Error: {e}") finally: # Make sure to terminate the process if 'process' in locals() and process.poll() is None: print("Terminating server process...") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: print("Process didn't terminate, killing it...") process.kill() print("\n�� Test completed!") if __name__ == "__main__": test_mcp_stdio() ``` -------------------------------------------------------------------------------- /scripts/test_messages.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Simple test script to try connecting to the MCP messages endpoint. This follows the MCP 2024-11-05 specification precisely. """ import json import requests import sys import time import sseclient import signal def handle_exit(signum, frame): """Handle exit gracefully when user presses Ctrl+C.""" print("\nInterrupted. Exiting...") sys.exit(0) # Register signal handler for clean exit signal.signal(signal.SIGINT, handle_exit) def test_mcp(): """ Test the MCP server with standard protocol communication. Follows the MCP specification for 2024-11-05 carefully. """ print("🚀 Testing MCP server following 2024-11-05 specification") # Connect to SSE endpoint print("Connecting to SSE endpoint...") headers = {"Accept": "text/event-stream"} sse_response = requests.get("http://localhost:9096/sse", headers=headers, stream=True) if sse_response.status_code != 200: print(f"❌ Failed to connect to SSE endpoint: {sse_response.status_code}") return print(f"✅ SSE connection established: {sse_response.status_code}") try: client = sseclient.SSEClient(sse_response) # Get the messages URL from the first event messages_url = None session_id = None for event in client.events(): print(f"📩 SSE event: {event.event} - {event.data}") if event.event == "endpoint": messages_url = f"http://localhost:9096{event.data}" # Extract session ID from URL if "session_id=" in event.data: session_id = event.data.split("session_id=")[1] print(f"✅ Got messages URL: {messages_url}") print(f"✅ Session ID: {session_id}") break if not messages_url: print("❌ Failed to get messages URL from SSE") sse_response.close() return # Now we have the messages URL, send initialize request print(f"\n📤 Sending initialize request to {messages_url}") initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "clientInfo": { "name": "mcp-trino-test-client", "version": "1.0.0" }, "capabilities": { "tools": True, "resources": { "supportedSources": ["trino://catalog"] } } } } response = requests.post(messages_url, json=initialize_request) print(f"Status code: {response.status_code}") if response.status_code != 202: print(f"❌ Initialize request failed: {response.text}") sse_response.close() return print(f"✅ Initialize request accepted") # Listen for events and handle protocol properly print("\n🔄 Listening for response events...") # Set up a timeout timeout = time.time() + 60 # 60 seconds timeout # Protocol state tracking status = { "initialized": False, "tools_requested": False, "query_requested": False, "summary_requested": False, "done": False } # Event loop while time.time() < timeout and not status["done"]: events_received = False for event in client.events(): events_received = True # Skip ping events if event.event == "ping": print("📍 Ping event received") continue print(f"\n📩 Received event: {event.event}") # If we get a message event, parse it if event.event == "message" and event.data: try: data = json.loads(event.data) print(f"📦 Parsed message: {json.dumps(data, indent=2)}") # Handle initialize response if "id" in data and data["id"] == 1 and not status["initialized"]: # Send initialized notification (following spec) print("\n📤 Sending initialized notification...") initialized_notification = { "jsonrpc": "2.0", "method": "initialized" } init_response = requests.post(messages_url, json=initialized_notification) if init_response.status_code != 202: print(f"❌ Initialized notification failed: {init_response.status_code}") else: print(f"✅ Initialized notification accepted") status["initialized"] = True # Now request the tools list print("\n📤 Sending tools/list request...") tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } tools_response = requests.post(messages_url, json=tools_request) if tools_response.status_code != 202: print(f"❌ Tools list request failed: {tools_response.status_code}") else: print(f"✅ Tools list request accepted") status["tools_requested"] = True # Handle tools list response elif "id" in data and data["id"] == 2 and not status["query_requested"]: # Extract available tools tools = [] if "result" in data and "tools" in data["result"]: tools = [tool["name"] for tool in data["result"]["tools"]] print(f"🔧 Available tools: {', '.join(tools)}") # Execute a memory query if the execute_query tool is available if "execute_query" in tools: print("\n📤 Sending query for memory.bullshit.bullshit_data...") query_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT * FROM memory.bullshit.bullshit_data", "catalog": "memory" } } } query_response = requests.post(messages_url, json=query_request) if query_response.status_code != 202: print(f"❌ Query request failed: {query_response.status_code}") else: print(f"✅ Query request accepted") status["query_requested"] = True else: print("❌ execute_query tool not available") status["done"] = True # Handle query response elif "id" in data and data["id"] == 3 and not status["summary_requested"]: # Check if query was successful if "result" in data: print(f"✅ Query succeeded with {data['result'].get('row_count', 0)} rows") # Now query the summary view print("\n📤 Sending query for memory.bullshit.bullshit_summary...") summary_request = { "jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": { "name": "execute_query", "arguments": { "sql": "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC", "catalog": "memory" } } } summary_response = requests.post(messages_url, json=summary_request) if summary_response.status_code != 202: print(f"❌ Summary query request failed: {summary_response.status_code}") else: print(f"✅ Summary query request accepted") status["summary_requested"] = True else: print(f"❌ Query failed: {data.get('error', 'Unknown error')}") status["done"] = True # Handle summary query response elif "id" in data and data["id"] == 4: if "result" in data: print(f"✅ Summary query succeeded with {data['result'].get('row_count', 0)} rows") # Print the summary data nicely formatted if "preview_rows" in data["result"]: for row in data["result"]["preview_rows"]: print(f" {row}") else: print(f"❌ Summary query failed: {data.get('error', 'Unknown error')}") print("\n🏁 All tests completed successfully!") status["done"] = True break except json.JSONDecodeError as e: print(f"❌ Error parsing message: {e}") except Exception as e: print(f"❌ Unexpected error: {e}") # Break out of the event loop if we're done if status["done"]: break # If we didn't receive any events, wait a bit before trying again if not events_received: time.sleep(0.5) # Check if we timed out if time.time() >= timeout and not status["done"]: print("⏱️ Timeout waiting for responses") except KeyboardInterrupt: print("\n🛑 Interrupted by user. Exiting...") except Exception as e: print(f"❌ Error: {e}") finally: # Close the SSE connection print("\n👋 Closing SSE connection...") sse_response.close() print("✅ Connection closed") if __name__ == "__main__": test_mcp() ``` -------------------------------------------------------------------------------- /src/trino_mcp/server.py: -------------------------------------------------------------------------------- ```python """ Main module for the Trino MCP server. """ from __future__ import annotations import argparse import json import sys import os import asyncio import time from contextlib import asynccontextmanager from dataclasses import dataclass from typing import AsyncIterator, Dict, Any, List, Optional import uvicorn from fastapi import FastAPI, Response, Body from fastapi.responses import JSONResponse from pydantic import BaseModel from loguru import logger from mcp.server.fastmcp import Context, FastMCP from trino_mcp.config import ServerConfig, TrinoConfig from trino_mcp.resources import register_trino_resources from trino_mcp.tools import register_trino_tools from trino_mcp.trino_client import TrinoClient # Global app context for health check access app_context_global = None @dataclass class AppContext: """Application context passed to all MCP handlers.""" trino_client: TrinoClient config: ServerConfig is_healthy: bool = True # Models for the LLM API class QueryRequest(BaseModel): """Model for query requests.""" query: str catalog: str = "memory" schema: Optional[str] = None explain: bool = False class QueryResponse(BaseModel): """Model for query responses.""" success: bool message: str results: Optional[Dict[str, Any]] = None @asynccontextmanager async def app_lifespan(mcp: FastMCP) -> AsyncIterator[AppContext]: """ Manage the application lifecycle. Args: mcp: The MCP server instance. Yields: AppContext: The application context with initialized services. """ global app_context_global logger.info("Initializing Trino MCP server") # Get server configuration from environment or command line config = parse_args() # Initialize Trino client trino_client = TrinoClient(config.trino) # Create and set global app context app_context = AppContext(trino_client=trino_client, config=config) app_context_global = app_context try: # Connect to Trino logger.info(f"Connecting to Trino at {config.trino.host}:{config.trino.port}") trino_client.connect() # Register resources and tools logger.info("Registering resources and tools") register_trino_resources(mcp, trino_client) register_trino_tools(mcp, trino_client) # Yield the application context logger.info("Trino MCP server initialized and ready") yield app_context except Exception as e: logger.error(f"Failed to initialize: {e}") app_context.is_healthy = False yield app_context finally: # Cleanup on shutdown logger.info("Shutting down Trino MCP server") if trino_client.conn: trino_client.disconnect() app_context.is_healthy = False def parse_args() -> ServerConfig: """ Parse command line arguments and return server configuration. Returns: ServerConfig: The server configuration. """ parser = argparse.ArgumentParser(description="Trino MCP server") # Server configuration parser.add_argument("--name", default="Trino MCP", help="Server name") parser.add_argument("--version", default="0.1.0", help="Server version") parser.add_argument("--transport", default="stdio", choices=["stdio", "sse"], help="Transport type") parser.add_argument("--host", default="127.0.0.1", help="Host for HTTP server (SSE transport only)") parser.add_argument("--port", type=int, default=3000, help="Port for HTTP server (SSE transport only)") parser.add_argument("--debug", action="store_true", help="Enable debug mode") # Trino connection parser.add_argument("--trino-host", default="localhost", help="Trino host") parser.add_argument("--trino-port", type=int, default=8080, help="Trino port") parser.add_argument("--trino-user", default="trino", help="Trino user") parser.add_argument("--trino-password", help="Trino password") parser.add_argument("--trino-catalog", help="Default Trino catalog") parser.add_argument("--trino-schema", help="Default Trino schema") parser.add_argument("--trino-http-scheme", default="http", help="Trino HTTP scheme") args = parser.parse_args() # Create Trino configuration trino_config = TrinoConfig( host=args.trino_host, port=args.trino_port, user=args.trino_user, password=args.trino_password, catalog=args.trino_catalog, schema=args.trino_schema, http_scheme=args.trino_http_scheme ) # Create server configuration server_config = ServerConfig( name=args.name, version=args.version, transport_type=args.transport, host=args.host, port=args.port, debug=args.debug, trino=trino_config ) return server_config def create_app() -> FastMCP: """ Create and configure the MCP server application. Returns: FastMCP: The configured MCP server. """ # Create the MCP server with lifespan management mcp = FastMCP( "Trino MCP", dependencies=["trino>=0.329.0"], lifespan=app_lifespan ) return mcp def create_health_app() -> FastAPI: """ Create a FastAPI app that provides a health check endpoint and LLM API. This function creates a FastAPI app with a health check endpoint and a query endpoint for LLMs to use. Returns: FastAPI: The FastAPI app with health check and LLM API endpoints. """ app = FastAPI( title="Trino MCP API", description="API for health checks and LLM query access to Trino MCP", version="0.1.0" ) @app.get("/health") async def health(): global app_context_global # For Docker health check, always return 200 during startup # This gives the app time to initialize return JSONResponse( status_code=200, content={"status": "ok", "message": "Health check endpoint is responding"} ) @app.post("/api/query", response_model=QueryResponse) async def query(request: QueryRequest): """ Execute a SQL query against Trino and return results. This endpoint is designed to be used by LLMs to query Trino through MCP. """ global app_context_global if not app_context_global or not app_context_global.is_healthy: return JSONResponse( status_code=503, content={ "success": False, "message": "Trino MCP server is not healthy or not initialized" } ) logger.info(f"LLM API Query: {request.query}") try: # Use the Trino client from the app context client = app_context_global.trino_client # Optionally add EXPLAIN query = request.query if request.explain: query = f"EXPLAIN {query}" # Execute the query result = client.execute_query(query, request.catalog, request.schema) # Format the results for the response formatted_rows = [] for row in result.rows: # Convert row to dict using column names row_dict = {} for i, col in enumerate(result.columns): row_dict[col] = row[i] formatted_rows.append(row_dict) return { "success": True, "message": "Query executed successfully", "results": { "query_id": result.query_id, "columns": result.columns, "rows": formatted_rows, "row_count": result.row_count, "execution_time_ms": result.query_time_ms } } except Exception as e: logger.error(f"Error executing query: {e}") return JSONResponse( status_code=400, content={ "success": False, "message": f"Error executing query: {str(e)}" } ) @app.get("/api") async def api_root(): """Root API endpoint with usage instructions.""" return { "message": "Trino MCP API for LLMs", "version": app_context_global.config.version if app_context_global else "unknown", "endpoints": { "health": "GET /health - Check server health", "query": "POST /api/query - Execute SQL queries" }, "query_example": { "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3", "catalog": "memory", "schema": "bullshit" } } return app def main() -> None: """ Main entry point for the server. """ config = parse_args() mcp = create_app() # ADDING EXPLICIT CONTEXT INITIALIZATION HERE global app_context_global try: # Initialize the Trino client logger.info(f"Connecting to Trino at {config.trino.host}:{config.trino.port}") trino_client = TrinoClient(config.trino) trino_client.connect() # Create application context app_context = AppContext( trino_client=trino_client, config=config, is_healthy=True ) # Set global context app_context_global = app_context # Register resources and tools register_trino_resources(mcp, trino_client) register_trino_tools(mcp, trino_client) logger.info("Trino MCP server initialized and ready") except Exception as e: logger.error(f"Error initializing Trino MCP: {e}") if app_context_global: app_context_global.is_healthy = False if config.transport_type == "stdio": # For STDIO transport, run directly logger.info("Starting Trino MCP server with STDIO transport") mcp.run() else: # For SSE transport, use run_sse_async method from MCP library logger.info(f"Starting Trino MCP server with SSE transport on {config.host}:{config.port}") # In MCP 1.3.0, run_sse_async takes no arguments # We set the environment variables to configure the host and port os.environ["MCP_HOST"] = config.host os.environ["MCP_PORT"] = str(config.port) # Configure more robust error handling for the server import traceback try: # Try to import and configure SSE settings if available in this version from mcp.server.sse import configure_sse configure_sse(ignore_client_disconnect=True) logger.info("Configured SSE with ignore_client_disconnect=True") except (ImportError, AttributeError): logger.warning("Could not configure SSE settings - this may be expected in some MCP versions") # Start a separate thread for the health check endpoint import threading def run_health_check(): """Run the health check FastAPI app.""" health_app = create_health_app() # Use a different port for the health check endpoint health_port = config.port + 1 logger.info(f"Starting API server on port {health_port}") uvicorn.run(health_app, host=config.host, port=health_port) # Start the health check in a separate thread health_thread = threading.Thread(target=run_health_check) health_thread.daemon = True health_thread.start() # Now run the SSE server with robust error handling try: asyncio.run(mcp.run_sse_async()) except RuntimeError as e: if "generator didn't stop after athrow()" in str(e): logger.error(f"Generator error in SSE server. This is a known issue with MCP 1.3.0: {e}") # Set unhealthy status for health checks if app_context_global: app_context_global.is_healthy = False logger.info("Server will continue running but may not function correctly.") # Keep the server alive despite the error import time while True: time.sleep(60) # Sleep to keep the container running else: logger.error(f"Fatal error running SSE server: {e}") logger.error(traceback.format_exc()) raise except Exception as e: logger.error(f"Fatal error running SSE server: {e}") logger.error(traceback.format_exc()) raise if __name__ == "__main__": logger.remove() logger.add(sys.stderr, level="DEBUG") main() ```