This is page 1 of 2. Use http://codebase.md/stinkgen/trino_mcp?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .gitignore ├── CHANGELOG.md ├── docker-compose.yml ├── Dockerfile ├── etc │ ├── catalog │ │ ├── bullshit.properties │ │ └── memory.properties │ ├── config.properties │ ├── jvm.config │ └── node.properties ├── examples │ └── simple_mcp_query.py ├── LICENSE ├── llm_query_trino.py ├── llm_trino_api.py ├── load_bullshit_data.py ├── openapi.json ├── pyproject.toml ├── pytest.ini ├── README.md ├── requirements-dev.txt ├── run_tests.sh ├── scripts │ ├── docker_stdio_test.py │ ├── fix_trino_session.py │ ├── test_direct_query.py │ ├── test_fixed_client.py │ ├── test_messages.py │ ├── test_quick_query.py │ └── test_stdio_trino.py ├── src │ └── trino_mcp │ ├── __init__.py │ ├── config.py │ ├── resources │ │ └── __init__.py │ ├── server.py │ ├── tools │ │ └── __init__.py │ └── trino_client.py ├── test_bullshit_query.py ├── test_llm_api.py ├── test_mcp_stdio.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── integration │ │ └── __init__.py │ └── test_client.py ├── tools │ ├── create_bullshit_data.py │ ├── run_queries.sh │ ├── setup │ │ ├── setup_data.sh │ │ └── setup_tables.sql │ └── setup_bullshit_table.py └── trino-conf ├── catalog │ └── memory.properties ├── config.properties ├── jvm.config └── node.properties ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python 2 | __pycache__/ 3 | *.py[cod] 4 | *$py.class 5 | *.so 6 | .Python 7 | build/ 8 | develop-eggs/ 9 | dist/ 10 | downloads/ 11 | eggs/ 12 | .eggs/ 13 | lib/ 14 | lib64/ 15 | parts/ 16 | sdist/ 17 | var/ 18 | wheels/ 19 | *.egg-info/ 20 | .installed.cfg 21 | *.egg 22 | 23 | # Virtual environments 24 | venv/ 25 | env/ 26 | ENV/ 27 | 28 | # Temp 29 | temp/ 30 | 31 | # IDE files 32 | .idea/ 33 | .vscode/ 34 | .cursor/ 35 | *.swp 36 | *.swo 37 | 38 | # Logs and databases 39 | *.log 40 | *.sqlite3 41 | logs/ 42 | 43 | # Project-specific 44 | data/ 45 | archive/ 46 | 47 | # OS specific files 48 | .DS_Store 49 | Thumbs.db ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Trino MCP Server 2 | 3 | Model Context Protocol server for Trino, providing AI models with structured access to Trino's distributed SQL query engine. 4 | 5 | ⚠️ **BETA RELEASE (v0.1.2)** ⚠️ 6 | This project is stabilizing with core features working and tested. Feel free to fork and contribute! 7 | 8 | ## Features 9 | 10 | - ✅ Fixed Docker container API initialization issue! (reliable server initalization) 11 | - ✅ Exposes Trino resources through MCP protocol 12 | - ✅ Enables AI tools to query and analyze data in Trino 13 | - ✅ Provides transport options (STDIO transport works reliably; SSE transport has issues) 14 | - ✅ Fixed catalog handling for proper Trino query execution 15 | - ✅ Both Docker container API and standalone Python API server options 16 | 17 | ## Quick Start 18 | 19 | ```bash 20 | # Start the server with docker-compose 21 | docker-compose up -d 22 | 23 | # Verify the API is working 24 | curl -X POST "http://localhost:9097/api/query" \ 25 | -H "Content-Type: application/json" \ 26 | -d '{"query": "SELECT 1 AS test"}' 27 | ``` 28 | 29 | Need a non-containerized version? Run the standalone API: 30 | 31 | ```bash 32 | # Run the standalone API server on port 8008 33 | python llm_trino_api.py 34 | ``` 35 | 36 | ## LLM Integration 37 | 38 | Want to give an LLM direct access to query your Trino instance? We've created simple tools for that! 39 | 40 | ### Command-Line LLM Interface 41 | 42 | The simplest way to let an LLM query Trino is through our command-line tool: 43 | 44 | ```bash 45 | # Simple direct query (perfect for LLMs) 46 | python llm_query_trino.py "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5" 47 | 48 | # Specify a different catalog or schema 49 | python llm_query_trino.py "SELECT * FROM information_schema.tables" memory information_schema 50 | ``` 51 | 52 | ### REST API for LLMs 53 | 54 | We offer two API options for integration with LLM applications: 55 | 56 | #### 1. Docker Container API (Port 9097) 57 | 58 | The Docker container exposes a REST API on port 9097: 59 | 60 | ```bash 61 | # Execute a query against the Docker container API 62 | curl -X POST "http://localhost:9097/api/query" \ 63 | -H "Content-Type: application/json" \ 64 | -d '{"query": "SELECT 1 AS test"}' 65 | ``` 66 | 67 | #### 2. Standalone Python API (Port 8008) 68 | 69 | For more flexible deployments, run the standalone API server: 70 | 71 | ```bash 72 | # Start the API server on port 8008 73 | python llm_trino_api.py 74 | ``` 75 | 76 | This creates endpoints at: 77 | - `GET http://localhost:8008/` - API usage info 78 | - `POST http://localhost:8008/query` - Execute SQL queries 79 | 80 | You can then have your LLM make HTTP requests to this endpoint: 81 | 82 | ```python 83 | # Example code an LLM might generate 84 | import requests 85 | 86 | def query_trino(sql_query): 87 | response = requests.post( 88 | "http://localhost:8008/query", 89 | json={"query": sql_query} 90 | ) 91 | return response.json() 92 | 93 | # LLM-generated query 94 | results = query_trino("SELECT job_title, AVG(salary) FROM memory.bullshit.real_bullshit_data GROUP BY job_title ORDER BY AVG(salary) DESC LIMIT 5") 95 | print(results["formatted_results"]) 96 | ``` 97 | 98 | This approach allows LLMs to focus on generating SQL, while our tools handle all the MCP protocol complexity! 99 | 100 | ## Demo and Validation Scripts 🚀 101 | 102 | We've created some badass demo scripts that show how AI models can use the MCP protocol to run complex queries against Trino: 103 | 104 | ### 1. Bullshit Data Generation and Loading 105 | 106 | The `tools/create_bullshit_data.py` script generates a dataset of 10,000 employees with ridiculous job titles, inflated salaries, and a "bullshit factor" rating (1-10): 107 | 108 | ```bash 109 | # Generate the bullshit data 110 | python tools/create_bullshit_data.py 111 | 112 | # Load the bullshit data into Trino's memory catalog 113 | python load_bullshit_data.py 114 | ``` 115 | 116 | ### 2. Running Complex Queries through MCP 117 | 118 | The `test_bullshit_query.py` script demonstrates end-to-end MCP interaction: 119 | - Connects to the MCP server using STDIO transport 120 | - Initializes the protocol following the MCP spec 121 | - Runs a complex SQL query with WHERE, GROUP BY, HAVING, ORDER BY 122 | - Processes and formats the results 123 | 124 | ```bash 125 | # Run a complex query against the bullshit data through MCP 126 | python test_bullshit_query.py 127 | ``` 128 | 129 | Example output showing top BS jobs with high salaries: 130 | ``` 131 | 🏆 TOP 10 BULLSHIT JOBS (high salary, high BS factor): 132 | ---------------------------------------------------------------------------------------------------- 133 | JOB_TITLE | COUNT | AVG_SALARY | MAX_SALARY | AVG_BS_FACTOR 134 | ---------------------------------------------------------------------------------------------------- 135 | Advanced Innovation Jedi | 2 | 241178.50 | 243458.00 | 7.50 136 | VP of Digital Officer | 1 | 235384.00 | 235384.00 | 7.00 137 | Innovation Technical Architect | 1 | 235210.00 | 235210.00 | 9.00 138 | ...and more! 139 | ``` 140 | 141 | ### 3. API Testing 142 | 143 | The `test_llm_api.py` script validates the API functionality: 144 | 145 | ```bash 146 | # Test the Docker container API 147 | python test_llm_api.py 148 | ``` 149 | 150 | This performs a comprehensive check of: 151 | - API endpoint discovery 152 | - Documentation availability 153 | - Valid query execution 154 | - Error handling for invalid queries 155 | 156 | ## Usage 157 | 158 | ```bash 159 | # Start the server with docker-compose 160 | docker-compose up -d 161 | ``` 162 | 163 | The server will be available at: 164 | - Trino: http://localhost:9095 165 | - MCP server: http://localhost:9096 166 | - API server: http://localhost:9097 167 | 168 | ## Client Connection 169 | 170 | ✅ **IMPORTANT**: The client scripts run on your local machine (OUTSIDE Docker) and connect TO the Docker containers. The scripts automatically handle this by using docker exec commands. You don't need to be inside the container to use MCP! 171 | 172 | Running tests from your local machine: 173 | 174 | ```bash 175 | # Generate and load data into Trino 176 | python tools/create_bullshit_data.py # Generates data locally 177 | python load_bullshit_data.py # Loads data to Trino in Docker 178 | 179 | # Run MCP query through Docker 180 | python test_bullshit_query.py # Queries using MCP in Docker 181 | ``` 182 | 183 | ## Transport Options 184 | 185 | This server supports two transport methods, but only STDIO is currently reliable: 186 | 187 | ### STDIO Transport (Recommended and Working) 188 | 189 | STDIO transport works reliably and is currently the only recommended method for testing and development: 190 | 191 | ```bash 192 | # Run with STDIO transport inside the container 193 | docker exec -i trino_mcp_trino-mcp_1 python -m trino_mcp.server --transport stdio --debug --trino-host trino --trino-port 8080 --trino-user trino --trino-catalog memory 194 | ``` 195 | 196 | ### SSE Transport (NOT RECOMMENDED - Has Critical Issues) 197 | 198 | SSE is the default transport in MCP but has serious issues with the current MCP 1.3.0 version, causing server crashes on client disconnections. **Not recommended for use until these issues are resolved**: 199 | 200 | ```bash 201 | # NOT RECOMMENDED: Run with SSE transport (crashes on disconnection) 202 | docker exec trino_mcp_trino-mcp_1 python -m trino_mcp.server --transport sse --host 0.0.0.0 --port 8000 --debug 203 | ``` 204 | 205 | ## Known Issues and Fixes 206 | 207 | ### Fixed: Docker Container API Initialization 208 | 209 | ✅ **FIXED**: We've resolved an issue where the API in the Docker container returned 503 Service Unavailable responses. The problem was with the `app_lifespan` function not properly initializing the `app_context_global` and Trino client connection. The fix ensures that: 210 | 211 | 1. The Trino client explicitly connects during startup 212 | 2. The AppContext global variable is properly initialized 213 | 3. Health checks now work correctly 214 | 215 | If you encounter 503 errors, check that your container has been rebuilt with the latest code: 216 | 217 | ```bash 218 | # Rebuild and restart the container with the fix 219 | docker-compose stop trino-mcp 220 | docker-compose rm -f trino-mcp 221 | docker-compose up -d trino-mcp 222 | ``` 223 | 224 | ### MCP 1.3.0 SSE Transport Crashes 225 | 226 | There's a critical issue with MCP 1.3.0's SSE transport that causes server crashes when clients disconnect. Until a newer MCP version is integrated, use STDIO transport exclusively. The error manifests as: 227 | 228 | ``` 229 | RuntimeError: generator didn't stop after athrow() 230 | anyio.BrokenResourceError 231 | ``` 232 | 233 | ### Trino Catalog Handling 234 | 235 | We fixed an issue with catalog handling in the Trino client. The original implementation attempted to use `USE catalog` statements, which don't work reliably. The fix directly sets the catalog in the connection parameters. 236 | 237 | ## Project Structure 238 | 239 | This project is organized as follows: 240 | 241 | - `src/` - Main source code for the Trino MCP server 242 | - `examples/` - Simple examples showing how to use the server 243 | - `scripts/` - Useful diagnostic and testing scripts 244 | - `tools/` - Utility scripts for data creation and setup 245 | - `tests/` - Automated tests 246 | 247 | Key files: 248 | - `llm_trino_api.py` - Standalone API server for LLM integration 249 | - `test_llm_api.py` - Test script for the API server 250 | - `test_mcp_stdio.py` - Main test script using STDIO transport (recommended) 251 | - `test_bullshit_query.py` - Complex query example with bullshit data 252 | - `load_bullshit_data.py` - Script to load generated data into Trino 253 | - `tools/create_bullshit_data.py` - Script to generate hilarious test data 254 | - `run_tests.sh` - Script to run automated tests 255 | - `examples/simple_mcp_query.py` - Simple example to query data using MCP 256 | 257 | ## Development 258 | 259 | **IMPORTANT**: All scripts can be run from your local machine - they'll automatically communicate with the Docker containers via docker exec commands! 260 | 261 | ```bash 262 | # Install development dependencies 263 | pip install -e ".[dev]" 264 | 265 | # Run automated tests 266 | ./run_tests.sh 267 | 268 | # Test MCP with STDIO transport (recommended) 269 | python test_mcp_stdio.py 270 | 271 | # Simple example query 272 | python examples/simple_mcp_query.py "SELECT 'Hello World' AS message" 273 | ``` 274 | 275 | ## Testing 276 | 277 | To test that Trino queries are working correctly, use the STDIO transport test script: 278 | 279 | ```bash 280 | # Recommended test method (STDIO transport) 281 | python test_mcp_stdio.py 282 | ``` 283 | 284 | For more complex testing with the bullshit data: 285 | ```bash 286 | # Load and query the bullshit data (shows the full power of Trino MCP!) 287 | python load_bullshit_data.py 288 | python test_bullshit_query.py 289 | ``` 290 | 291 | For testing the LLM API endpoint: 292 | ```bash 293 | # Test the Docker container API 294 | python test_llm_api.py 295 | 296 | # Test the standalone API (make sure it's running first) 297 | python llm_trino_api.py 298 | curl -X POST "http://localhost:8008/query" \ 299 | -H "Content-Type: application/json" \ 300 | -d '{"query": "SELECT 1 AS test"}' 301 | ``` 302 | 303 | ## How LLMs Can Use This 304 | 305 | LLMs can use the Trino MCP server to: 306 | 307 | 1. **Get Database Schema Information**: 308 | ```python 309 | # Example prompt to LLM: "What schemas are available in the memory catalog?" 310 | # LLM can generate code to query: 311 | query = "SHOW SCHEMAS FROM memory" 312 | ``` 313 | 314 | 2. **Run Complex Analytical Queries**: 315 | ```python 316 | # Example prompt: "Find the top 5 job titles with highest average salaries" 317 | # LLM can generate complex SQL: 318 | query = """ 319 | SELECT 320 | job_title, 321 | AVG(salary) as avg_salary 322 | FROM 323 | memory.bullshit.real_bullshit_data 324 | GROUP BY 325 | job_title 326 | ORDER BY 327 | avg_salary DESC 328 | LIMIT 5 329 | """ 330 | ``` 331 | 332 | 3. **Perform Data Analysis and Present Results**: 333 | ```python 334 | # LLM can parse the response, extract insights and present to user: 335 | "The highest paying job title is 'Advanced Innovation Jedi' with an average salary of $241,178.50" 336 | ``` 337 | 338 | ### Real LLM Analysis Example: Bullshit Jobs by Company 339 | 340 | Here's a real example of what an LLM could produce when asked to "Identify the companies with the most employees in bullshit jobs and create a Mermaid chart": 341 | 342 | #### Step 1: LLM generates and runs the query 343 | 344 | ```sql 345 | SELECT 346 | company, 347 | COUNT(*) as employee_count, 348 | AVG(bullshit_factor) as avg_bs_factor 349 | FROM 350 | memory.bullshit.real_bullshit_data 351 | WHERE 352 | bullshit_factor > 7 353 | GROUP BY 354 | company 355 | ORDER BY 356 | employee_count DESC, 357 | avg_bs_factor DESC 358 | LIMIT 10 359 | ``` 360 | 361 | #### Step 2: LLM gets and analyzes the results 362 | 363 | ``` 364 | COMPANY | EMPLOYEE_COUNT | AVG_BS_FACTOR 365 | ---------------------------------------- 366 | Unknown Co | 2 | 9.0 367 | BitEdge | 1 | 10.0 368 | CyberWare | 1 | 10.0 369 | BitLink | 1 | 10.0 370 | AlgoMatrix | 1 | 10.0 371 | CryptoHub | 1 | 10.0 372 | BitGrid | 1 | 10.0 373 | MLStream | 1 | 10.0 374 | CloudCube | 1 | 10.0 375 | UltraEdge | 1 | 10.0 376 | ``` 377 | 378 | #### Step 3: LLM generates a Mermaid chart visualization 379 | 380 | ```mermaid 381 | %%{init: {'theme': 'forest'}}%% 382 | graph LR 383 | title[Companies with Most Bullshit Jobs] 384 | style title fill:#333,stroke:#333,stroke-width:1px,color:white,font-weight:bold,font-size:18px 385 | 386 | Companies --> UnknownCo[Unknown Co] 387 | Companies --> BitEdge[BitEdge] 388 | Companies --> CyberWare[CyberWare] 389 | Companies --> BitLink[BitLink] 390 | Companies --> AlgoMatrix[AlgoMatrix] 391 | Companies --> CryptoHub[CryptoHub] 392 | Companies --> BitGrid[BitGrid] 393 | Companies --> MLStream[MLStream] 394 | Companies --> CloudCube[CloudCube] 395 | Companies --> UltraEdge[UltraEdge] 396 | 397 | UnknownCo --- Count2[2 employees] 398 | BitEdge --- Count1a[1 employee] 399 | CyberWare --- Count1b[1 employee] 400 | BitLink --- Count1c[1 employee] 401 | AlgoMatrix --- Count1d[1 employee] 402 | CryptoHub --- Count1e[1 employee] 403 | BitGrid --- Count1f[1 employee] 404 | MLStream --- Count1g[1 employee] 405 | CloudCube --- Count1h[1 employee] 406 | UltraEdge --- Count1i[1 employee] 407 | 408 | classDef company fill:#ff5733,stroke:#333,stroke-width:1px,color:white,font-weight:bold; 409 | classDef count fill:#006100,stroke:#333,stroke-width:1px,color:white,font-weight:bold; 410 | 411 | class UnknownCo,BitEdge,CyberWare,BitLink,AlgoMatrix,CryptoHub,BitGrid,MLStream,CloudCube,UltraEdge company; 412 | class Count2,Count1a,Count1b,Count1c,Count1d,Count1e,Count1f,Count1g,Count1h,Count1i count; 413 | ``` 414 | 415 | **Alternative Bar Chart:** 416 | 417 | ```mermaid 418 | %%{init: {'theme': 'default'}}%% 419 | pie showData 420 | title Companies with Bullshit Jobs 421 | "Unknown Co (BS: 9.0)" : 2 422 | "BitEdge (BS: 10.0)" : 1 423 | "CyberWare (BS: 10.0)" : 1 424 | "BitLink (BS: 10.0)" : 1 425 | "AlgoMatrix (BS: 10.0)" : 1 426 | "CryptoHub (BS: 10.0)" : 1 427 | "BitGrid (BS: 10.0)" : 1 428 | "MLStream (BS: 10.0)" : 1 429 | "CloudCube (BS: 10.0)" : 1 430 | "UltraEdge (BS: 10.0)" : 1 431 | ``` 432 | 433 | #### Step 4: LLM provides key insights 434 | 435 | The LLM can analyze the data and provide insights: 436 | 437 | - "Unknown Co" has the most employees in bullshit roles (2), while all others have just one 438 | - Most companies have achieved a perfect 10.0 bullshit factor score 439 | - Tech-focused companies (BitEdge, CyberWare, etc.) seem to create particularly meaningless roles 440 | - Bullshit roles appear concentrated at executive or specialized position levels 441 | 442 | This example demonstrates how an LLM can: 443 | 1. Generate appropriate SQL queries based on natural language questions 444 | 2. Process and interpret the results from Trino 445 | 3. Create visual representations of the data 446 | 4. Provide meaningful insights and analysis 447 | 448 | ## Accessing the API 449 | 450 | The Trino MCP server now includes two API options for accessing data: 451 | 452 | ### 1. Docker Container API (Port 9097) 453 | 454 | ```python 455 | import requests 456 | import json 457 | 458 | # API endpoint (default port 9097 in Docker setup) 459 | api_url = "http://localhost:9097/api/query" 460 | 461 | # Define your SQL query 462 | query_data = { 463 | "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5", 464 | "catalog": "memory", 465 | "schema": "bullshit" 466 | } 467 | 468 | # Send the request 469 | response = requests.post(api_url, json=query_data) 470 | results = response.json() 471 | 472 | # Process the results 473 | if results["success"]: 474 | print(f"Query returned {results['results']['row_count']} rows") 475 | for row in results['results']['rows']: 476 | print(row) 477 | else: 478 | print(f"Query failed: {results['message']}") 479 | ``` 480 | 481 | ### 2. Standalone Python API (Port 8008) 482 | 483 | ```python 484 | # Same code as above, but with different port 485 | api_url = "http://localhost:8008/query" 486 | ``` 487 | 488 | Both APIs offer the following endpoints: 489 | - `GET /api` - API documentation and usage examples 490 | - `POST /api/query` - Execute SQL queries against Trino 491 | 492 | These APIs eliminate the need for wrapper scripts and let LLMs query Trino directly using REST calls, making it much simpler to integrate with services like Claude, GPT, and other AI systems. 493 | 494 | ## Troubleshooting 495 | 496 | ### API Returns 503 Service Unavailable 497 | 498 | If the Docker container API returns 503 errors: 499 | 500 | 1. Make sure you've rebuilt the container with the latest code: 501 | ```bash 502 | docker-compose stop trino-mcp 503 | docker-compose rm -f trino-mcp 504 | docker-compose up -d trino-mcp 505 | ``` 506 | 507 | 2. Check the container logs for errors: 508 | ```bash 509 | docker logs trino_mcp_trino-mcp_1 510 | ``` 511 | 512 | 3. Verify that Trino is running properly: 513 | ```bash 514 | curl -s http://localhost:9095/v1/info | jq 515 | ``` 516 | 517 | ### Port Conflicts with Standalone API 518 | 519 | The standalone API defaults to port 8008 to avoid conflicts. If you see an "address already in use" error: 520 | 521 | 1. Edit `llm_trino_api.py` and change the port number in the last line: 522 | ```python 523 | uvicorn.run(app, host="127.0.0.1", port=8008) 524 | ``` 525 | 526 | 2. Run with a custom port via command line: 527 | ```bash 528 | python -c "import llm_trino_api; import uvicorn; uvicorn.run(llm_trino_api.app, host='127.0.0.1', port=8009)" 529 | ``` 530 | 531 | ## Future Work 532 | 533 | This is now in beta with these improvements planned: 534 | 535 | - [ ] Integrate with newer MCP versions when available to fix SSE transport issues 536 | - [ ] Add/Validate support for Hive, JDBC, and other connectors 537 | - [ ] Add more comprehensive query validation across different types and complexities 538 | - [ ] Implement support for more data types and advanced Trino features 539 | - [ ] Improve error handling and recovery mechanisms 540 | - [ ] Add user authentication and permission controls 541 | - [ ] Create more comprehensive examples and documentation 542 | - [ ] Develop admin monitoring and management interfaces 543 | - [ ] Add performance metrics and query optimization hints 544 | - [ ] Implement support for long-running queries and result streaming 545 | 546 | --- 547 | 548 | *Developed by Stink Labs, 2025* 549 | ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Test package for the Trino MCP server. 3 | """ ``` -------------------------------------------------------------------------------- /etc/catalog/memory.properties: -------------------------------------------------------------------------------- ``` 1 | connector.name=memory 2 | memory.max-data-per-node=512MB ``` -------------------------------------------------------------------------------- /trino-conf/catalog/memory.properties: -------------------------------------------------------------------------------- ``` 1 | connector.name=memory 2 | memory.max-data-per-node=512MB ``` -------------------------------------------------------------------------------- /tests/integration/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Integration test package for the Trino MCP server. 3 | """ ``` -------------------------------------------------------------------------------- /etc/config.properties: -------------------------------------------------------------------------------- ``` 1 | coordinator=true 2 | node-scheduler.include-coordinator=true 3 | http-server.http.port=8080 4 | discovery.uri=http://localhost:8080 ``` -------------------------------------------------------------------------------- /trino-conf/config.properties: -------------------------------------------------------------------------------- ``` 1 | coordinator=true 2 | node-scheduler.include-coordinator=true 3 | http-server.http.port=8080 4 | discovery.uri=http://localhost:8080 ``` -------------------------------------------------------------------------------- /etc/node.properties: -------------------------------------------------------------------------------- ``` 1 | node.environment=production 2 | node.data-dir=/data/trino 3 | node.server-log-file=/var/log/trino/server.log 4 | node.launcher-log-file=/var/log/trino/launcher.log ``` -------------------------------------------------------------------------------- /trino-conf/node.properties: -------------------------------------------------------------------------------- ``` 1 | node.environment=production 2 | node.data-dir=/data/trino 3 | node.server-log-file=/var/log/trino/server.log 4 | node.launcher-log-file=/var/log/trino/launcher.log ``` -------------------------------------------------------------------------------- /etc/jvm.config: -------------------------------------------------------------------------------- ``` 1 | -server 2 | -Xmx2G 3 | -XX:+UseG1GC 4 | -XX:G1HeapRegionSize=32M 5 | -XX:+UseGCOverheadLimit 6 | -XX:+ExplicitGCInvokesConcurrent 7 | -XX:+HeapDumpOnOutOfMemoryError 8 | -XX:+ExitOnOutOfMemoryError 9 | -Djdk.attach.allowAttachSelf=true ``` -------------------------------------------------------------------------------- /trino-conf/jvm.config: -------------------------------------------------------------------------------- ``` 1 | -server 2 | -Xmx2G 3 | -XX:+UseG1GC 4 | -XX:G1HeapRegionSize=32M 5 | -XX:+UseGCOverheadLimit 6 | -XX:+ExplicitGCInvokesConcurrent 7 | -XX:+HeapDumpOnOutOfMemoryError 8 | -XX:+ExitOnOutOfMemoryError 9 | -Djdk.attach.allowAttachSelf=true ``` -------------------------------------------------------------------------------- /requirements-dev.txt: -------------------------------------------------------------------------------- ``` 1 | # Development dependencies 2 | pytest>=7.3.1 3 | pytest-cov>=4.1.0 4 | black>=23.0.0 5 | isort>=5.12.0 6 | mypy>=1.4.0 7 | 8 | # SSE client for testing 9 | sseclient-py>=1.7.2 10 | 11 | # HTTP client 12 | requests>=2.28.0 13 | 14 | # Type checking 15 | types-requests>=2.28.0 ``` -------------------------------------------------------------------------------- /src/trino_mcp/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Trino MCP server package. 3 | 4 | This package provides an MCP (Model Context Protocol) server for Trino. 5 | It enables AI systems to interact with Trino databases using the standardized 6 | MCP protocol. 7 | """ 8 | 9 | __version__ = "0.1.2" 10 | ``` -------------------------------------------------------------------------------- /etc/catalog/bullshit.properties: -------------------------------------------------------------------------------- ``` 1 | connector.name=hive 2 | hive.metastore.uri=thrift://hive-metastore:9083 3 | hive.non-managed-table-writes-enabled=true 4 | hive.parquet.use-column-names=true 5 | hive.max-partitions-per-scan=1000000 6 | hive.metastore-cache-ttl=60m 7 | hive.metastore-refresh-interval=5m ``` -------------------------------------------------------------------------------- /openapi.json: -------------------------------------------------------------------------------- ```json 1 | {"openapi":"3.1.0","info":{"title":"Trino MCP Health API","version":"0.1.0"},"paths":{"/health":{"get":{"summary":"Health","operationId":"health_health_get","responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{}}}}}}}}} ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` 1 | [pytest] 2 | testpaths = tests 3 | python_files = test_*.py 4 | python_classes = Test* 5 | python_functions = test_* 6 | markers = 7 | integration: marks tests as integration tests (deselect with '-m "not integration"') 8 | docker: marks tests that require docker (deselect with '-m "not docker"') 9 | filterwarnings = 10 | ignore::DeprecationWarning 11 | ignore::PendingDeprecationWarning ``` -------------------------------------------------------------------------------- /tools/setup/setup_tables.sql: -------------------------------------------------------------------------------- ```sql 1 | -- Setup Tables in Trino with Hive Metastore 2 | -- This script creates necessary schemas and tables, then loads the Parquet data 3 | 4 | -- Create a schema for our data 5 | CREATE SCHEMA IF NOT EXISTS bullshit.raw; 6 | 7 | -- Create a table for our parquet data 8 | CREATE TABLE IF NOT EXISTS bullshit.raw.bullshit_data ( 9 | id BIGINT, 10 | name VARCHAR, 11 | value DOUBLE, 12 | category VARCHAR, 13 | created_at TIMESTAMP 14 | ) 15 | WITH ( 16 | external_location = 'file:///opt/trino/data', 17 | format = 'PARQUET' 18 | ); 19 | 20 | -- Show tables in our schema 21 | SELECT * FROM bullshit.raw.bullshit_data LIMIT 10; 22 | 23 | -- Create a view for convenience 24 | CREATE OR REPLACE VIEW bullshit.raw.bullshit_summary AS 25 | SELECT 26 | category, 27 | COUNT(*) as count, 28 | AVG(value) as avg_value, 29 | MIN(value) as min_value, 30 | MAX(value) as max_value 31 | FROM bullshit.raw.bullshit_data 32 | GROUP BY category; 33 | 34 | -- Query the view 35 | SELECT * FROM bullshit.raw.bullshit_summary ORDER BY count DESC; ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | FROM python:3.10-slim 2 | 3 | WORKDIR /app 4 | 5 | # Create a non-root user 6 | RUN groupadd -r trino && useradd --no-log-init -r -g trino trino && \ 7 | mkdir -p /app/logs && \ 8 | chown -R trino:trino /app 9 | 10 | # Install runtime dependencies 11 | RUN apt-get update && apt-get install -y --no-install-recommends \ 12 | curl \ 13 | build-essential \ 14 | && rm -rf /var/lib/apt/lists/* 15 | 16 | # Copy project files 17 | COPY . /app/ 18 | 19 | # Install the MCP server 20 | RUN pip install --no-cache-dir . 21 | 22 | # Set environment variables 23 | ENV PYTHONUNBUFFERED=1 24 | ENV PYTHONDONTWRITEBYTECODE=1 25 | ENV MCP_HOST=0.0.0.0 26 | ENV MCP_PORT=8000 27 | ENV TRINO_HOST=trino 28 | ENV TRINO_PORT=8080 29 | ENV TRINO_USER=trino 30 | ENV TRINO_CATALOG=memory 31 | 32 | # Expose ports for SSE transport and LLM API 33 | EXPOSE 8000 8001 34 | 35 | # Switch to non-root user 36 | USER trino 37 | 38 | # Health check - use port 8001 for the health check endpoint and LLM API 39 | HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ 40 | CMD curl -f http://localhost:8001/health || exit 1 41 | 42 | # Default command (can be overridden) 43 | ENTRYPOINT ["python", "-m", "trino_mcp.server"] 44 | 45 | # Default arguments (can be overridden) 46 | CMD ["--transport", "sse", "--host", "0.0.0.0", "--port", "8000", "--trino-host", "trino", "--trino-port", "8080", "--debug"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [build-system] 2 | requires = ["hatchling"] 3 | build-backend = "hatchling.build" 4 | 5 | [project] 6 | name = "trino-mcp" 7 | version = "0.1.2" 8 | description = "Model Context Protocol (MCP) server for Trino" 9 | readme = "README.md" 10 | requires-python = ">=3.10" 11 | authors = [ 12 | {name = "Trino MCP Team"} 13 | ] 14 | classifiers = [ 15 | "Programming Language :: Python :: 3", 16 | "License :: OSI Approved :: MIT License", 17 | "Operating System :: OS Independent", 18 | "Development Status :: 3 - Alpha", 19 | ] 20 | dependencies = [ 21 | "mcp>=1.3.0,<1.4.0", 22 | "fastapi>=0.100.0", 23 | "trino>=0.329.0", 24 | "pydantic>=2.0.0", 25 | "loguru>=0.7.0", 26 | "uvicorn>=0.23.0", 27 | "contextlib-chdir>=1.0.2", 28 | ] 29 | 30 | [project.optional-dependencies] 31 | dev = [ 32 | "black>=23.0.0", 33 | "isort>=5.12.0", 34 | "mypy>=1.4.0", 35 | "pytest>=7.3.1", 36 | "pytest-cov>=4.1.0", 37 | ] 38 | 39 | [project.scripts] 40 | trino-mcp = "trino_mcp.server:main" 41 | 42 | [tool.hatch.build.targets.wheel] 43 | packages = ["src/trino_mcp"] 44 | 45 | [tool.black] 46 | line-length = 100 47 | target-version = ["py310"] 48 | 49 | [tool.isort] 50 | profile = "black" 51 | line_length = 100 52 | 53 | [tool.mypy] 54 | python_version = "3.10" 55 | warn_return_any = true 56 | warn_unused_configs = true 57 | disallow_untyped_defs = true 58 | disallow_incomplete_defs = true 59 | 60 | [[tool.mypy.overrides]] 61 | module = "tests.*" 62 | disallow_untyped_defs = false 63 | disallow_incomplete_defs = false 64 | ``` -------------------------------------------------------------------------------- /tools/run_queries.sh: -------------------------------------------------------------------------------- ```bash 1 | #!/bin/bash 2 | 3 | # Wait for Trino to be ready 4 | echo "Waiting for Trino to be ready..." 5 | sleep 30 6 | 7 | echo "Creating schema in memory catalog..." 8 | docker exec -it trino_mcp_trino_1 trino --execute "CREATE SCHEMA IF NOT EXISTS memory.bullshit" 9 | 10 | echo "Creating table with sample data..." 11 | docker exec -it trino_mcp_trino_1 trino --execute " 12 | CREATE TABLE memory.bullshit.bullshit_data AS 13 | SELECT * FROM ( 14 | VALUES 15 | (1, 'Sample 1', 10.5, 'A', TIMESTAMP '2023-01-01 12:00:00'), 16 | (2, 'Sample 2', 20.7, 'B', TIMESTAMP '2023-01-02 13:00:00'), 17 | (3, 'Sample 3', 15.2, 'A', TIMESTAMP '2023-01-03 14:00:00'), 18 | (4, 'Sample 4', 30.1, 'C', TIMESTAMP '2023-01-04 15:00:00'), 19 | (5, 'Sample 5', 25.8, 'B', TIMESTAMP '2023-01-05 16:00:00') 20 | ) AS t(id, name, value, category, created_at) 21 | " 22 | 23 | echo "Querying data from table..." 24 | docker exec -it trino_mcp_trino_1 trino --execute "SELECT * FROM memory.bullshit.bullshit_data" 25 | 26 | echo "Creating summary view..." 27 | docker exec -it trino_mcp_trino_1 trino --execute " 28 | CREATE OR REPLACE VIEW memory.bullshit.bullshit_summary AS 29 | SELECT 30 | category, 31 | COUNT(*) as count, 32 | AVG(value) as avg_value, 33 | MIN(value) as min_value, 34 | MAX(value) as max_value 35 | FROM 36 | memory.bullshit.bullshit_data 37 | GROUP BY 38 | category 39 | " 40 | 41 | echo "Querying summary view..." 42 | docker exec -it trino_mcp_trino_1 trino --execute "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC" 43 | 44 | echo "Setup complete." ``` -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown 1 | # Changelog 2 | 3 | ## v0.1.2 (2023-06-01) 4 | 5 | ### ✨ New Features 6 | 7 | - **Integrated LLM API**: Added a native REST API endpoint to the MCP server for direct LLM queries 8 | - **Built-in FastAPI Endpoint**: Port 8001 now exposes a JSON API for running SQL queries without wrapper scripts 9 | - **Query Endpoint**: Added `/api/query` endpoint for executing SQL against Trino with JSON responses 10 | 11 | ### 📝 Documentation 12 | 13 | - Updated README with API usage instructions 14 | - Added code examples for the REST API 15 | 16 | ## v0.1.1 (2023-05-17) 17 | 18 | ### 🐛 Bug Fixes 19 | 20 | - **Fixed Trino client catalog handling**: The Trino client now correctly sets the catalog in connection parameters instead of using unreliable `USE catalog` statements. 21 | - **Improved query execution**: Queries now correctly execute against specified catalogs. 22 | - **Added error handling**: Better error handling for catalog and schema operations. 23 | 24 | ### 📝 Documentation 25 | 26 | - Added detailed documentation about transport options and known issues. 27 | - Created test scripts demonstrating successful MCP-Trino interaction. 28 | - Documented workarounds for MCP 1.3.0 SSE transport issues. 29 | 30 | ### 🧪 Testing 31 | 32 | - Added `test_mcp_stdio.py` for testing MCP with STDIO transport. 33 | - Added catalog connection testing scripts and diagnostics. 34 | 35 | ### 🚧 Known Issues 36 | 37 | - MCP 1.3.0 SSE transport has issues with client disconnection. 38 | - Use STDIO transport for reliable operation until upgrading to a newer MCP version. ``` -------------------------------------------------------------------------------- /tools/setup/setup_data.sh: -------------------------------------------------------------------------------- ```bash 1 | #!/bin/bash 2 | set -e 3 | 4 | echo "Waiting for Trino to become available..." 5 | max_attempts=20 6 | attempt=0 7 | while [ $attempt -lt $max_attempts ]; do 8 | if curl -s "http://localhost:9095/v1/info" > /dev/null; then 9 | echo "Trino is available!" 10 | break 11 | else 12 | attempt=$((attempt + 1)) 13 | echo "Attempt $attempt/$max_attempts: Trino not yet available. Waiting 5 seconds..." 14 | sleep 5 15 | fi 16 | done 17 | 18 | if [ $attempt -eq $max_attempts ]; then 19 | echo "Failed to connect to Trino after multiple attempts" 20 | exit 1 21 | fi 22 | 23 | echo -e "\n=== Creating schema and table ===" 24 | # Create a schema and table that points to our Parquet files 25 | trino_query=" 26 | -- Create schema if it doesn't exist 27 | CREATE SCHEMA IF NOT EXISTS bullshit.datasets 28 | WITH (location = 'file:///opt/trino/data'); 29 | 30 | -- Create table pointing to our Parquet file 31 | CREATE TABLE IF NOT EXISTS bullshit.datasets.employees 32 | WITH ( 33 | external_location = 'file:///opt/trino/data/bullshit_data.parquet', 34 | format = 'PARQUET' 35 | ) 36 | AS SELECT * FROM parquet 'file:///opt/trino/data/bullshit_data.parquet'; 37 | " 38 | 39 | # Execute the queries 40 | echo "$trino_query" | curl -s -X POST -H "X-Trino-User: trino" --data-binary @- http://localhost:9095/v1/statement | jq 41 | 42 | echo -e "\n=== Verifying data ===" 43 | # Run a simple query to verify the table 44 | curl -s -X POST -H "X-Trino-User: trino" --data "SELECT COUNT(*) FROM bullshit.datasets.employees" http://localhost:9095/v1/statement | jq 45 | curl -s -X POST -H "X-Trino-User: trino" --data "SELECT * FROM bullshit.datasets.employees LIMIT 3" http://localhost:9095/v1/statement | jq 46 | 47 | echo -e "\nSetup complete!" ``` -------------------------------------------------------------------------------- /run_tests.sh: -------------------------------------------------------------------------------- ```bash 1 | #!/bin/bash 2 | set -e 3 | 4 | # Setup colors for output 5 | GREEN='\033[0;32m' 6 | RED='\033[0;31m' 7 | YELLOW='\033[0;33m' 8 | NC='\033[0m' # No Color 9 | 10 | echo -e "${YELLOW}Trino MCP Server Test Runner${NC}" 11 | echo "===============================" 12 | 13 | # Check for virtual environment 14 | if [ -z "$VIRTUAL_ENV" ]; then 15 | echo -e "${YELLOW}No virtual environment detected.${NC}" 16 | 17 | # Check if venv exists 18 | if [ -d "venv" ]; then 19 | echo -e "${GREEN}Activating existing virtual environment...${NC}" 20 | source venv/bin/activate 21 | else 22 | echo -e "${YELLOW}Creating new virtual environment...${NC}" 23 | python -m venv venv 24 | source venv/bin/activate 25 | echo -e "${GREEN}Installing dependencies...${NC}" 26 | pip install -e ".[dev]" 27 | fi 28 | fi 29 | 30 | # Function to check if a command exists 31 | command_exists() { 32 | command -v "$1" &> /dev/null 33 | } 34 | 35 | # Check for Trino availability 36 | echo -e "${YELLOW}Checking Trino availability...${NC}" 37 | 38 | TRINO_HOST=${TEST_TRINO_HOST:-localhost} 39 | TRINO_PORT=${TEST_TRINO_PORT:-9095} 40 | 41 | if command_exists curl; then 42 | if curl -s -o /dev/null -w "%{http_code}" http://${TRINO_HOST}:${TRINO_PORT}/v1/info | grep -q "200"; then 43 | echo -e "${GREEN}Trino is available at ${TRINO_HOST}:${TRINO_PORT}${NC}" 44 | else 45 | echo -e "${RED}WARNING: Trino does not appear to be available at ${TRINO_HOST}:${TRINO_PORT}.${NC}" 46 | echo -e "${YELLOW}Some tests may be skipped or fail.${NC}" 47 | echo -e "${YELLOW}You may need to start Trino with Docker: docker-compose up -d${NC}" 48 | missing_trino=true 49 | fi 50 | else 51 | echo -e "${YELLOW}curl not found, skipping Trino availability check${NC}" 52 | fi 53 | 54 | # Run the tests 55 | echo -e "${YELLOW}Running unit tests...${NC}" 56 | pytest tests/ -v --exclude=tests/integration 57 | 58 | echo "" 59 | echo -e "${YELLOW}Running integration tests...${NC}" 60 | echo -e "${YELLOW}(These may be skipped if Docker is not available)${NC}" 61 | pytest tests/integration/ -v 62 | 63 | echo "" 64 | echo -e "${GREEN}All tests completed!${NC}" ``` -------------------------------------------------------------------------------- /src/trino_mcp/config.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Configuration module for the Trino MCP server. 3 | """ 4 | from dataclasses import dataclass, field 5 | from typing import Any, Dict, Optional 6 | 7 | 8 | @dataclass 9 | class TrinoConfig: 10 | """Configuration for the Trino connection.""" 11 | host: str = "localhost" 12 | port: int = 8080 13 | user: str = "trino" 14 | password: Optional[str] = None 15 | catalog: Optional[str] = None 16 | schema: Optional[str] = None 17 | http_scheme: str = "http" 18 | auth: Optional[Any] = None 19 | max_attempts: int = 3 20 | request_timeout: float = 30.0 21 | http_headers: Dict[str, str] = field(default_factory=dict) 22 | verify: bool = True 23 | 24 | @property 25 | def connection_params(self) -> Dict[str, Any]: 26 | """Return connection parameters for the Trino client.""" 27 | params = { 28 | "host": self.host, 29 | "port": self.port, 30 | "user": self.user, 31 | "http_scheme": self.http_scheme, 32 | "max_attempts": self.max_attempts, 33 | "request_timeout": self.request_timeout, 34 | "verify": self.verify, 35 | } 36 | 37 | if self.password: 38 | params["password"] = self.password 39 | 40 | if self.auth: 41 | params["auth"] = self.auth 42 | 43 | if self.http_headers: 44 | params["http_headers"] = self.http_headers 45 | 46 | return params 47 | 48 | 49 | @dataclass 50 | class ServerConfig: 51 | """Configuration for the MCP server.""" 52 | name: str = "Trino MCP" 53 | version: str = "0.1.0" 54 | transport_type: str = "stdio" # "stdio" or "sse" 55 | host: str = "127.0.0.1" 56 | port: int = 3000 57 | debug: bool = False 58 | trino: TrinoConfig = field(default_factory=TrinoConfig) 59 | 60 | 61 | def load_config_from_env() -> ServerConfig: 62 | """ 63 | Load configuration from environment variables. 64 | 65 | Returns: 66 | ServerConfig: The server configuration. 67 | """ 68 | # This would normally load from environment variables or a config file 69 | # For now, we'll just return the default config 70 | return ServerConfig() 71 | ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml 1 | version: '3.8' 2 | 3 | services: 4 | # Hive Metastore with embedded Derby 5 | hive-metastore: 6 | image: apache/hive:3.1.3 7 | container_name: trino_mcp_hive_metastore_1 8 | environment: 9 | SERVICE_NAME: metastore 10 | HIVE_METASTORE_WAREHOUSE_DIR: /opt/hive/warehouse 11 | command: /opt/hive/bin/hive --service metastore 12 | volumes: 13 | - ./data:/opt/hive/data 14 | - hive-data:/opt/hive/warehouse 15 | ports: 16 | - "9083:9083" 17 | networks: 18 | - trino-net 19 | restart: unless-stopped 20 | healthcheck: 21 | test: ["CMD", "nc", "-z", "localhost", "9083"] 22 | interval: 10s 23 | timeout: 5s 24 | retries: 5 25 | start_period: 20s 26 | 27 | # Trino service 28 | trino: 29 | image: trinodb/trino:latest 30 | container_name: trino_mcp_trino_1 31 | ports: 32 | - "9095:8080" 33 | volumes: 34 | - ./etc:/etc/trino:ro 35 | - ./data:/opt/trino/data:ro 36 | - trino-data:/data/trino 37 | - trino-logs:/var/log/trino 38 | environment: 39 | - JAVA_OPTS=-Xmx2G -XX:+UseG1GC 40 | networks: 41 | - trino-net 42 | depends_on: 43 | - hive-metastore 44 | deploy: 45 | resources: 46 | limits: 47 | cpus: '2' 48 | memory: 3G 49 | reservations: 50 | cpus: '0.5' 51 | memory: 1G 52 | healthcheck: 53 | test: ["CMD", "curl", "-f", "http://localhost:8080/v1/info"] 54 | interval: 10s 55 | timeout: 5s 56 | retries: 5 57 | start_period: 30s 58 | restart: unless-stopped 59 | 60 | # MCP server for Trino 61 | trino-mcp: 62 | build: 63 | context: . 64 | dockerfile: Dockerfile 65 | container_name: trino_mcp_trino-mcp_1 66 | ports: 67 | - "9096:8000" # Main MCP SSE port 68 | - "9097:8001" # LLM API port with health check endpoint 69 | volumes: 70 | - mcp-logs:/app/logs 71 | environment: 72 | - PYTHONUNBUFFERED=1 73 | - TRINO_HOST=trino 74 | - TRINO_PORT=8080 75 | - LOG_LEVEL=INFO 76 | - MCP_HOST=0.0.0.0 77 | - MCP_PORT=8000 78 | depends_on: 79 | trino: 80 | condition: service_healthy 81 | networks: 82 | - trino-net 83 | deploy: 84 | resources: 85 | limits: 86 | cpus: '1' 87 | memory: 1G 88 | reservations: 89 | cpus: '0.25' 90 | memory: 512M 91 | healthcheck: 92 | test: ["CMD", "curl", "-f", "http://localhost:8001/health"] 93 | interval: 20s 94 | timeout: 5s 95 | retries: 3 96 | start_period: 10s 97 | restart: unless-stopped 98 | 99 | networks: 100 | trino-net: 101 | driver: bridge 102 | name: trino_network 103 | 104 | volumes: 105 | trino-data: 106 | name: trino_data 107 | trino-logs: 108 | name: trino_logs 109 | mcp-logs: 110 | name: mcp_logs 111 | hive-data: 112 | name: hive_warehouse_data ``` -------------------------------------------------------------------------------- /llm_trino_api.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Simple FastAPI server that lets LLMs query Trino through MCP via a REST API. 4 | 5 | Run with: 6 | pip install fastapi uvicorn 7 | uvicorn llm_trino_api:app --reload 8 | 9 | This creates a REST API endpoint at: 10 | http://localhost:8000/query 11 | 12 | Example curl: 13 | curl -X POST "http://localhost:8000/query" \\ 14 | -H "Content-Type: application/json" \\ 15 | -d '{"query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3"}' 16 | """ 17 | import fastapi 18 | import pydantic 19 | from llm_query_trino import query_trino, format_results 20 | from typing import Optional, Dict, Any 21 | 22 | # Create FastAPI app 23 | app = fastapi.FastAPI( 24 | title="Trino MCP API for LLMs", 25 | description="Simple API to query Trino via MCP protocol for LLMs", 26 | version="0.1.0" 27 | ) 28 | 29 | # Define request model 30 | class QueryRequest(pydantic.BaseModel): 31 | query: str 32 | catalog: str = "memory" 33 | schema: Optional[str] = "bullshit" 34 | explain: bool = False 35 | 36 | # Define response model 37 | class QueryResponse(pydantic.BaseModel): 38 | success: bool 39 | message: str 40 | results: Optional[Dict[str, Any]] = None 41 | formatted_results: Optional[str] = None 42 | 43 | @app.post("/query", response_model=QueryResponse) 44 | async def trino_query(request: QueryRequest): 45 | """ 46 | Execute a SQL query against Trino via MCP and return results. 47 | 48 | Example: 49 | ```json 50 | { 51 | "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3", 52 | "catalog": "memory", 53 | "schema": "bullshit" 54 | } 55 | ``` 56 | """ 57 | try: 58 | # If explain mode is on, add EXPLAIN to the query 59 | query = request.query 60 | if request.explain: 61 | query = f"EXPLAIN {query}" 62 | 63 | # Execute the query 64 | results = query_trino(query, request.catalog, request.schema) 65 | 66 | # Check for errors 67 | if "error" in results: 68 | return QueryResponse( 69 | success=False, 70 | message=f"Query execution failed: {results['error']}", 71 | results=results 72 | ) 73 | 74 | # Format results for human readability 75 | formatted_results = format_results(results) 76 | 77 | return QueryResponse( 78 | success=True, 79 | message="Query executed successfully", 80 | results=results, 81 | formatted_results=formatted_results 82 | ) 83 | 84 | except Exception as e: 85 | return QueryResponse( 86 | success=False, 87 | message=f"Error executing query: {str(e)}" 88 | ) 89 | 90 | @app.get("/") 91 | async def root(): 92 | """Root endpoint with usage instructions.""" 93 | return { 94 | "message": "Trino MCP API for LLMs", 95 | "usage": "POST to /query with JSON body containing 'query', 'catalog' (optional), and 'schema' (optional)", 96 | "example": { 97 | "query": "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 3", 98 | "catalog": "memory", 99 | "schema": "bullshit" 100 | } 101 | } 102 | 103 | if __name__ == "__main__": 104 | import uvicorn 105 | uvicorn.run(app, host="127.0.0.1", port=8008) ``` -------------------------------------------------------------------------------- /scripts/test_direct_query.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Direct test script that bypasses MCP and uses the Trino client directly. 4 | This helps determine if the issue is with the MCP protocol or with Trino. 5 | """ 6 | import time 7 | import argparse 8 | from typing import Optional, Dict, Any 9 | 10 | # Import the client class from the module 11 | from src.trino_mcp.trino_client import TrinoClient 12 | from src.trino_mcp.config import TrinoConfig 13 | 14 | def main(): 15 | """ 16 | Run direct queries against Trino without using MCP. 17 | """ 18 | print("Direct Trino test - bypassing MCP") 19 | 20 | # Configure Trino client 21 | config = TrinoConfig( 22 | host="localhost", 23 | port=9095, # The exposed Trino port 24 | user="trino", 25 | catalog="memory", 26 | schema=None, 27 | http_scheme="http" 28 | ) 29 | 30 | client = TrinoClient(config) 31 | 32 | try: 33 | # Connect to Trino 34 | print("Connecting to Trino...") 35 | client.connect() 36 | print("Connected successfully!") 37 | 38 | # List catalogs 39 | print("\nListing catalogs:") 40 | catalogs = client.get_catalogs() 41 | for catalog in catalogs: 42 | print(f"- {catalog['name']}") 43 | 44 | # List schemas in memory catalog 45 | print("\nListing schemas in memory catalog:") 46 | schemas = client.get_schemas("memory") 47 | for schema in schemas: 48 | print(f"- {schema['name']}") 49 | 50 | # Look for our test schema 51 | if any(schema['name'] == 'bullshit' for schema in schemas): 52 | print("\nFound our test schema 'bullshit'") 53 | 54 | # List tables 55 | print("\nListing tables in memory.bullshit:") 56 | tables = client.get_tables("memory", "bullshit") 57 | for table in tables: 58 | print(f"- {table['name']}") 59 | 60 | # Query the data table 61 | if any(table['name'] == 'bullshit_data' for table in tables): 62 | print("\nQuerying memory.bullshit.bullshit_data:") 63 | result = client.execute_query("SELECT * FROM memory.bullshit.bullshit_data") 64 | 65 | # Print columns 66 | print(f"Columns: {', '.join(result.columns)}") 67 | 68 | # Print rows 69 | print(f"Rows ({result.row_count}):") 70 | for row in result.rows: 71 | print(f" {row}") 72 | 73 | # Query the summary view 74 | if any(table['name'] == 'bullshit_summary' for table in tables): 75 | print("\nQuerying memory.bullshit.bullshit_summary:") 76 | result = client.execute_query( 77 | "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC" 78 | ) 79 | 80 | # Print columns 81 | print(f"Columns: {', '.join(result.columns)}") 82 | 83 | # Print rows 84 | print(f"Rows ({result.row_count}):") 85 | for row in result.rows: 86 | print(f" {row}") 87 | else: 88 | print("Summary view not found") 89 | else: 90 | print("Data table not found") 91 | else: 92 | print("Test schema 'bullshit' not found") 93 | 94 | except Exception as e: 95 | print(f"Error: {e}") 96 | finally: 97 | # Disconnect 98 | if client.conn: 99 | print("\nDisconnecting from Trino...") 100 | client.disconnect() 101 | print("Disconnected.") 102 | 103 | if __name__ == "__main__": 104 | main() ``` -------------------------------------------------------------------------------- /src/trino_mcp/resources/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | MCP resources for interacting with Trino. 3 | """ 4 | from dataclasses import dataclass 5 | from typing import Any, Dict, List, Optional, Tuple 6 | 7 | from mcp.server.fastmcp import Context, FastMCP 8 | 9 | from trino_mcp.trino_client import TrinoClient 10 | 11 | 12 | def register_trino_resources(mcp: FastMCP, client: TrinoClient) -> None: 13 | """ 14 | Register Trino resources with the MCP server. 15 | 16 | Args: 17 | mcp: The MCP server instance. 18 | client: The Trino client instance. 19 | """ 20 | 21 | @mcp.resource("trino://catalog") 22 | def list_catalogs() -> List[Dict[str, Any]]: 23 | """ 24 | List all available Trino catalogs. 25 | """ 26 | return client.get_catalogs() 27 | 28 | @mcp.resource("trino://catalog/{catalog}") 29 | def get_catalog(catalog: str) -> Dict[str, Any]: 30 | """ 31 | Get information about a specific Trino catalog. 32 | """ 33 | # For now, just return basic info - could be enhanced later 34 | return {"name": catalog} 35 | 36 | @mcp.resource("trino://catalog/{catalog}/schemas") 37 | def list_schemas(catalog: str) -> List[Dict[str, Any]]: 38 | """ 39 | List all schemas in a Trino catalog. 40 | """ 41 | return client.get_schemas(catalog) 42 | 43 | @mcp.resource("trino://catalog/{catalog}/schema/{schema}") 44 | def get_schema(catalog: str, schema: str) -> Dict[str, Any]: 45 | """ 46 | Get information about a specific Trino schema. 47 | """ 48 | return {"name": schema, "catalog": catalog} 49 | 50 | @mcp.resource("trino://catalog/{catalog}/schema/{schema}/tables") 51 | def list_tables(catalog: str, schema: str) -> List[Dict[str, Any]]: 52 | """ 53 | List all tables in a Trino schema. 54 | """ 55 | return client.get_tables(catalog, schema) 56 | 57 | @mcp.resource("trino://catalog/{catalog}/schema/{schema}/table/{table}") 58 | def get_table(catalog: str, schema: str, table: str) -> Dict[str, Any]: 59 | """ 60 | Get information about a specific Trino table. 61 | """ 62 | return client.get_table_details(catalog, schema, table) 63 | 64 | @mcp.resource("trino://catalog/{catalog}/schema/{schema}/table/{table}/columns") 65 | def list_columns(catalog: str, schema: str, table: str) -> List[Dict[str, Any]]: 66 | """ 67 | List all columns in a Trino table. 68 | """ 69 | return client.get_columns(catalog, schema, table) 70 | 71 | @mcp.resource("trino://catalog/{catalog}/schema/{schema}/table/{table}/column/{column}") 72 | def get_column(catalog: str, schema: str, table: str, column: str) -> Dict[str, Any]: 73 | """ 74 | Get information about a specific Trino column. 75 | """ 76 | columns = client.get_columns(catalog, schema, table) 77 | for col in columns: 78 | if col["name"] == column: 79 | return col 80 | 81 | # If column not found, return a basic structure 82 | return { 83 | "name": column, 84 | "catalog": catalog, 85 | "schema": schema, 86 | "table": table, 87 | "error": "Column not found" 88 | } 89 | 90 | @mcp.resource("trino://query/{query_id}") 91 | def get_query_result(query_id: str) -> Dict[str, Any]: 92 | """ 93 | Get the result of a specific Trino query by its ID. 94 | """ 95 | # This is a placeholder, as we don't store query results by ID in this basic implementation 96 | # In a real implementation, you would look up the query results from a cache or storage 97 | return { 98 | "query_id": query_id, 99 | "error": "Query results not available. This resource is for demonstration purposes only." 100 | } 101 | ``` -------------------------------------------------------------------------------- /scripts/fix_trino_session.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Direct test of Trino client session catalog handling. 4 | This script tests various ways to set the catalog name in Trino. 5 | """ 6 | import sys 7 | import time 8 | import traceback 9 | import trino 10 | 11 | def test_trino_sessions(): 12 | """Test different approaches to setting the catalog in Trino sessions""" 13 | print("🔬 Testing Trino session catalog handling") 14 | 15 | # Test 1: Default connection and USE statements 16 | print("\n=== Test 1: Default connection with USE statements ===") 17 | try: 18 | conn = trino.dbapi.connect( 19 | host="trino", 20 | port=8080, 21 | user="trino", 22 | http_scheme="http" 23 | ) 24 | 25 | print("Connection established") 26 | cursor1 = conn.cursor() 27 | 28 | # Try to set catalog with USE statement 29 | print("Setting catalog with USE statement") 30 | cursor1.execute("USE memory") 31 | 32 | # Try a query with the set catalog 33 | print("Executing query with set catalog") 34 | try: 35 | cursor1.execute("SELECT 1 as test") 36 | result = cursor1.fetchall() 37 | print(f"Result: {result}") 38 | except Exception as e: 39 | print(f"❌ Query failed: {e}") 40 | 41 | conn.close() 42 | except Exception as e: 43 | print(f"❌ Test 1 failed: {e}") 44 | traceback.print_exception(type(e), e, e.__traceback__) 45 | 46 | # Test 2: Connection with catalog parameter 47 | print("\n=== Test 2: Connection with catalog parameter ===") 48 | try: 49 | conn = trino.dbapi.connect( 50 | host="trino", 51 | port=8080, 52 | user="trino", 53 | http_scheme="http", 54 | catalog="memory" 55 | ) 56 | 57 | print("Connection established with catalog parameter") 58 | cursor2 = conn.cursor() 59 | 60 | # Try a query with the catalog parameter 61 | print("Executing query with catalog parameter") 62 | try: 63 | cursor2.execute("SELECT 1 as test") 64 | result = cursor2.fetchall() 65 | print(f"Result: {result}") 66 | except Exception as e: 67 | print(f"❌ Query failed: {e}") 68 | 69 | conn.close() 70 | except Exception as e: 71 | print(f"❌ Test 2 failed: {e}") 72 | traceback.print_exception(type(e), e, e.__traceback__) 73 | 74 | # Test 3: Explicit catalog in query 75 | print("\n=== Test 3: Explicit catalog in query ===") 76 | try: 77 | conn = trino.dbapi.connect( 78 | host="trino", 79 | port=8080, 80 | user="trino", 81 | http_scheme="http" 82 | ) 83 | 84 | print("Connection established") 85 | cursor3 = conn.cursor() 86 | 87 | # Try a query with explicit catalog in the query 88 | print("Executing query with explicit catalog") 89 | try: 90 | cursor3.execute("SELECT 1 as test FROM memory.information_schema.tables WHERE 1=0") 91 | result = cursor3.fetchall() 92 | print(f"Result: {result}") 93 | except Exception as e: 94 | print(f"❌ Query failed: {e}") 95 | 96 | conn.close() 97 | except Exception as e: 98 | print(f"❌ Test 3 failed: {e}") 99 | traceback.print_exception(type(e), e, e.__traceback__) 100 | 101 | # Test 4: Connection parameters with session properties 102 | print("\n=== Test 4: Connection with session properties ===") 103 | try: 104 | conn = trino.dbapi.connect( 105 | host="trino", 106 | port=8080, 107 | user="trino", 108 | http_scheme="http", 109 | catalog="memory", 110 | session_properties={"catalog": "memory"} 111 | ) 112 | 113 | print("Connection established with session properties") 114 | cursor4 = conn.cursor() 115 | 116 | # Try a query with session properties 117 | print("Executing query with session properties") 118 | try: 119 | cursor4.execute("SELECT 1 as test") 120 | result = cursor4.fetchall() 121 | print(f"Result: {result}") 122 | except Exception as e: 123 | print(f"❌ Query failed: {e}") 124 | 125 | conn.close() 126 | except Exception as e: 127 | print(f"❌ Test 4 failed: {e}") 128 | traceback.print_exception(type(e), e, e.__traceback__) 129 | 130 | print("\n🏁 Testing complete!") 131 | 132 | if __name__ == "__main__": 133 | test_trino_sessions() ``` -------------------------------------------------------------------------------- /examples/simple_mcp_query.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Simple example script for Trino MCP querying using STDIO transport. 4 | 5 | This demonstrates the most basic end-to-end flow of running a query through MCP. 6 | """ 7 | import json 8 | import subprocess 9 | import sys 10 | import time 11 | 12 | def run_query_with_mcp(sql_query: str, catalog: str = "memory"): 13 | """ 14 | Run a SQL query against Trino using the MCP STDIO transport. 15 | 16 | Args: 17 | sql_query: The SQL query to run 18 | catalog: The catalog to use (default: memory) 19 | 20 | Returns: 21 | The query results (if successful) 22 | """ 23 | print(f"🚀 Running query with Trino MCP") 24 | print(f"SQL: {sql_query}") 25 | print(f"Catalog: {catalog}") 26 | 27 | # Start the MCP server with STDIO transport 28 | cmd = [ 29 | "docker", "exec", "-i", "trino_mcp_trino-mcp_1", 30 | "python", "-m", "trino_mcp.server", 31 | "--transport", "stdio", 32 | "--trino-host", "trino", 33 | "--trino-port", "8080", 34 | "--trino-user", "trino", 35 | "--trino-catalog", catalog 36 | ] 37 | 38 | try: 39 | process = subprocess.Popen( 40 | cmd, 41 | stdin=subprocess.PIPE, 42 | stdout=subprocess.PIPE, 43 | stderr=subprocess.PIPE, 44 | text=True, 45 | bufsize=1 46 | ) 47 | 48 | # Allow server to start 49 | time.sleep(1) 50 | 51 | # Function to send requests and get responses 52 | def send_request(request): 53 | request_json = json.dumps(request) + "\n" 54 | process.stdin.write(request_json) 55 | process.stdin.flush() 56 | 57 | response = process.stdout.readline() 58 | if response: 59 | return json.loads(response) 60 | return None 61 | 62 | # Step 1: Initialize MCP 63 | print("\n1. Initializing MCP...") 64 | init_request = { 65 | "jsonrpc": "2.0", 66 | "id": 1, 67 | "method": "initialize", 68 | "params": { 69 | "protocolVersion": "2024-11-05", 70 | "clientInfo": { 71 | "name": "simple-example", 72 | "version": "1.0.0" 73 | }, 74 | "capabilities": { 75 | "tools": True 76 | } 77 | } 78 | } 79 | 80 | init_response = send_request(init_request) 81 | if not init_response: 82 | raise Exception("Failed to initialize MCP") 83 | 84 | print("✅ MCP initialized") 85 | 86 | # Step 2: Send initialized notification 87 | init_notification = { 88 | "jsonrpc": "2.0", 89 | "method": "notifications/initialized", 90 | "params": {} 91 | } 92 | 93 | send_request(init_notification) 94 | 95 | # Step 3: Execute query 96 | print("\n2. Executing query...") 97 | query_request = { 98 | "jsonrpc": "2.0", 99 | "id": 2, 100 | "method": "tools/call", 101 | "params": { 102 | "name": "execute_query", 103 | "arguments": { 104 | "sql": sql_query, 105 | "catalog": catalog 106 | } 107 | } 108 | } 109 | 110 | query_response = send_request(query_request) 111 | if not query_response: 112 | raise Exception("Failed to execute query") 113 | 114 | if "error" in query_response: 115 | error = query_response["error"] 116 | print(f"❌ Query failed: {error}") 117 | return None 118 | 119 | # Print and return results 120 | result = query_response["result"] 121 | print("\n✅ Query executed successfully!") 122 | 123 | # Format results for display 124 | if "columns" in result: 125 | print("\nColumns:", ", ".join(result.get("columns", []))) 126 | print(f"Row count: {result.get('row_count', 0)}") 127 | 128 | if "preview_rows" in result: 129 | print("\nResults:") 130 | for i, row in enumerate(result["preview_rows"]): 131 | print(f" {i+1}. {row}") 132 | 133 | return result 134 | 135 | except Exception as e: 136 | print(f"❌ Error: {e}") 137 | return None 138 | finally: 139 | # Clean up 140 | if 'process' in locals(): 141 | process.terminate() 142 | try: 143 | process.wait(timeout=5) 144 | except subprocess.TimeoutExpired: 145 | process.kill() 146 | 147 | if __name__ == "__main__": 148 | # Get query from command line args or use default 149 | query = "SELECT 'Hello from Trino MCP!' AS greeting" 150 | 151 | if len(sys.argv) > 1: 152 | query = sys.argv[1] 153 | 154 | # Run the query 155 | run_query_with_mcp(query) ``` -------------------------------------------------------------------------------- /tools/setup_bullshit_table.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Set up the bullshit schema and table in Trino 4 | """ 5 | import os 6 | import time 7 | import trino 8 | import pandas as pd 9 | from trino.exceptions import TrinoExternalError 10 | 11 | # Connect to Trino 12 | def connect_to_trino(): 13 | print("Waiting for Trino to become available...") 14 | max_attempts = 20 15 | attempt = 0 16 | while attempt < max_attempts: 17 | try: 18 | conn = trino.dbapi.connect( 19 | host="localhost", 20 | port=9095, 21 | user="trino", 22 | catalog="bullshit", 23 | schema="datasets", 24 | ) 25 | 26 | # Test the connection 27 | with conn.cursor() as cursor: 28 | cursor.execute("SELECT 1") 29 | cursor.fetchone() 30 | 31 | print("Trino is available!") 32 | return conn 33 | except Exception as e: 34 | attempt += 1 35 | print(f"Attempt {attempt}/{max_attempts}: Trino not yet available. Waiting 5 seconds... ({str(e)})") 36 | time.sleep(5) 37 | 38 | raise Exception("Failed to connect to Trino after multiple attempts") 39 | 40 | # Create schema if it doesn't exist 41 | def create_schema(conn): 42 | print("Creating schema if it doesn't exist...") 43 | with conn.cursor() as cursor: 44 | try: 45 | # Try to list tables in the schema to see if it exists 46 | try: 47 | cursor.execute("SHOW TABLES FROM bullshit.datasets") 48 | rows = cursor.fetchall() 49 | print(f"Schema already exists with {len(rows)} tables") 50 | return 51 | except Exception as e: 52 | pass # Schema probably doesn't exist, continue to create it 53 | 54 | # Create schema 55 | cursor.execute(""" 56 | CREATE SCHEMA IF NOT EXISTS bullshit.datasets 57 | WITH (location = 'file:///bullshit-data') 58 | """) 59 | print("Schema created successfully") 60 | except Exception as e: 61 | print(f"Error creating schema: {e}") 62 | # Continue anyway, the error might be that the schema already exists 63 | 64 | # Get table schema from parquet file 65 | def get_parquet_schema(): 66 | print("Reading parquet file to determine schema...") 67 | try: 68 | df = pd.read_parquet('data/bullshit_data.parquet') 69 | 70 | # Map pandas dtypes to Trino types 71 | type_mapping = { 72 | 'int64': 'INTEGER', 73 | 'int32': 'INTEGER', 74 | 'float64': 'DOUBLE', 75 | 'float32': 'DOUBLE', 76 | 'object': 'VARCHAR', 77 | 'bool': 'BOOLEAN', 78 | 'datetime64[ns]': 'TIMESTAMP', 79 | } 80 | 81 | columns = [] 82 | for col_name, dtype in df.dtypes.items(): 83 | trino_type = type_mapping.get(str(dtype), 'VARCHAR') 84 | columns.append(f'"{col_name}" {trino_type}') 85 | 86 | return columns 87 | except Exception as e: 88 | print(f"Error reading parquet file: {e}") 89 | return None 90 | 91 | # Create the table 92 | def create_table(conn, columns): 93 | print("Creating table...") 94 | columns_str = ",\n ".join(columns) 95 | sql = f""" 96 | CREATE TABLE IF NOT EXISTS bullshit.datasets.employees ( 97 | {columns_str} 98 | ) 99 | WITH ( 100 | external_location = 'file:///bullshit-data/bullshit_data.parquet', 101 | format = 'PARQUET' 102 | ) 103 | """ 104 | print("SQL:", sql) 105 | 106 | with conn.cursor() as cursor: 107 | try: 108 | cursor.execute(sql) 109 | print("Table created successfully") 110 | except Exception as e: 111 | print(f"Error creating table: {e}") 112 | 113 | # Verify table was created by running a query 114 | def verify_table(conn): 115 | print("Verifying table creation...") 116 | with conn.cursor() as cursor: 117 | try: 118 | cursor.execute("SELECT * FROM bullshit.datasets.employees LIMIT 5") 119 | rows = cursor.fetchall() 120 | print(f"Successfully queried table with {len(rows)} rows") 121 | 122 | if rows: 123 | print("First row:") 124 | for row in rows: 125 | print(row) 126 | break 127 | except Exception as e: 128 | print(f"Error verifying table: {e}") 129 | 130 | def main(): 131 | try: 132 | conn = connect_to_trino() 133 | print("Connecting to Trino...") 134 | 135 | create_schema(conn) 136 | 137 | columns = get_parquet_schema() 138 | if columns: 139 | create_table(conn, columns) 140 | verify_table(conn) 141 | else: 142 | print("Failed to get table schema from parquet file") 143 | except Exception as e: 144 | print(f"An error occurred: {e}") 145 | 146 | if __name__ == "__main__": 147 | main() ``` -------------------------------------------------------------------------------- /tests/test_client.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Simple test script for the Trino MCP client. 4 | This script connects to the Trino MCP server and performs some basic operations. 5 | """ 6 | import json 7 | import os 8 | import sys 9 | import time 10 | import threading 11 | from typing import Dict, Any, List, Optional, Callable 12 | 13 | import requests 14 | import sseclient 15 | import logging 16 | from rich.console import Console 17 | 18 | # Default port for MCP server, changed to match docker-compose.yml 19 | DEFAULT_MCP_HOST = "localhost" 20 | DEFAULT_MCP_PORT = 9096 21 | 22 | 23 | class SSEListener: 24 | """ 25 | Server-Sent Events (SSE) listener for MCP. 26 | This runs in a separate thread to receive notifications from the server. 27 | """ 28 | 29 | def __init__(self, url: str, message_callback: Callable[[Dict[str, Any]], None]): 30 | """ 31 | Initialize the SSE listener. 32 | 33 | Args: 34 | url: The SSE endpoint URL. 35 | message_callback: Callback function to handle incoming messages. 36 | """ 37 | self.url = url 38 | self.message_callback = message_callback 39 | self.running = False 40 | self.thread = None 41 | 42 | def start(self) -> None: 43 | """Start the SSE listener in a separate thread.""" 44 | if self.running: 45 | return 46 | 47 | self.running = True 48 | self.thread = threading.Thread(target=self._listen) 49 | self.thread.daemon = True 50 | self.thread.start() 51 | 52 | def stop(self) -> None: 53 | """Stop the SSE listener.""" 54 | self.running = False 55 | if self.thread: 56 | self.thread.join(timeout=1.0) 57 | self.thread = None 58 | 59 | def _listen(self) -> None: 60 | """Listen for SSE events.""" 61 | try: 62 | headers = {"Accept": "text/event-stream"} 63 | response = requests.get(self.url, headers=headers, stream=True) 64 | client = sseclient.SSEClient(response) 65 | 66 | for event in client.events(): 67 | if not self.running: 68 | break 69 | 70 | try: 71 | if event.data: 72 | data = json.loads(event.data) 73 | self.message_callback(data) 74 | except json.JSONDecodeError: 75 | print(f"Failed to parse SSE message: {event.data}") 76 | except Exception as e: 77 | print(f"Error processing SSE message: {e}") 78 | 79 | except Exception as e: 80 | if self.running: 81 | print(f"SSE connection error: {e}") 82 | finally: 83 | self.running = False 84 | 85 | 86 | def test_sse_client(base_url=f"http://localhost:{DEFAULT_MCP_PORT}"): 87 | """ 88 | Test communication with the SSE transport. 89 | 90 | Args: 91 | base_url: The base URL of the SSE server. 92 | """ 93 | print(f"Testing SSE client with {base_url}...") 94 | 95 | # First, let's check what endpoints are available 96 | print("Checking available endpoints...") 97 | try: 98 | response = requests.get(base_url) 99 | print(f"Root path status: {response.status_code}") 100 | if response.status_code == 200: 101 | print(f"Content: {response.text[:500]}") # Print first 500 chars 102 | except Exception as e: 103 | print(f"Error checking root path: {e}") 104 | 105 | # Try common MCP endpoints 106 | endpoints_to_check = [ 107 | "/mcp", 108 | "/mcp/sse", 109 | "/mcp/2024-11-05", 110 | "/mcp/message", 111 | "/api/mcp", 112 | "/api/mcp/sse" 113 | ] 114 | 115 | for endpoint in endpoints_to_check: 116 | try: 117 | url = f"{base_url}{endpoint}" 118 | print(f"\nChecking endpoint: {url}") 119 | response = requests.get(url) 120 | print(f"Status: {response.status_code}") 121 | if response.status_code == 200: 122 | print(f"Content: {response.text[:100]}") # Print first 100 chars 123 | except Exception as e: 124 | print(f"Error: {e}") 125 | 126 | # Try the /sse endpoint with proper SSE headers 127 | print("\nChecking SSE endpoint with proper headers...") 128 | try: 129 | sse_url = f"{base_url}/sse" 130 | print(f"Connecting to SSE endpoint: {sse_url}") 131 | 132 | # Setup SSE message handler 133 | def handle_sse_message(message): 134 | print(f"Received SSE message: {message.data}") 135 | 136 | # Use the SSEClient to connect properly 137 | print("Starting SSE connection...") 138 | headers = {"Accept": "text/event-stream"} 139 | response = requests.get(sse_url, headers=headers, stream=True) 140 | client = sseclient.SSEClient(response) 141 | 142 | # Try to get the first few events 143 | print("Waiting for SSE events...") 144 | event_count = 0 145 | for event in client.events(): 146 | print(f"Event received: {event.data}") 147 | event_count += 1 148 | if event_count >= 3: # Get at most 3 events 149 | break 150 | 151 | except Exception as e: 152 | print(f"Error with SSE connection: {e}") 153 | 154 | 155 | if __name__ == "__main__": 156 | # Get the server URL from environment or command line 157 | server_url = os.environ.get("SERVER_URL", f"http://localhost:{DEFAULT_MCP_PORT}") 158 | 159 | if len(sys.argv) > 1: 160 | server_url = sys.argv[1] 161 | 162 | test_sse_client(server_url) ``` -------------------------------------------------------------------------------- /scripts/test_quick_query.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Quick test script that runs a single query and exits properly. 4 | Shows that Trino works but is just empty. 5 | """ 6 | import json 7 | import subprocess 8 | import sys 9 | import time 10 | import threading 11 | import os 12 | 13 | def run_quick_query(): 14 | """Run a quick query against Trino via MCP and exit properly.""" 15 | print("🚀 Running quick query test - this should exit cleanly!") 16 | 17 | # Get the current directory for module path 18 | current_dir = os.path.abspath(os.path.dirname(__file__)) 19 | 20 | # Start the server process with STDIO transport 21 | process = subprocess.Popen( 22 | ["python", "src/trino_mcp/server.py", "--transport", "stdio"], 23 | stdin=subprocess.PIPE, 24 | stdout=subprocess.PIPE, 25 | stderr=subprocess.PIPE, 26 | text=True, 27 | bufsize=1, # Line buffered 28 | env=dict(os.environ, PYTHONPATH=os.path.join(current_dir, "src")) 29 | ) 30 | 31 | # Create a thread to read stderr to prevent deadlocks 32 | def read_stderr(): 33 | for line in process.stderr: 34 | print(f"[SERVER] {line.strip()}") 35 | 36 | stderr_thread = threading.Thread(target=read_stderr, daemon=True) 37 | stderr_thread.start() 38 | 39 | # Wait a bit for the server to start up 40 | time.sleep(2) 41 | 42 | query_response = None 43 | try: 44 | # Send initialize request 45 | initialize_request = { 46 | "jsonrpc": "2.0", 47 | "id": 1, 48 | "method": "initialize", 49 | "params": { 50 | "protocolVersion": "2024-11-05", 51 | "clientInfo": {"name": "quick-query-test", "version": "1.0.0"}, 52 | "capabilities": {"tools": True, "resources": {"supportedSources": ["trino://catalog"]}} 53 | } 54 | } 55 | print(f"Sending initialize request: {json.dumps(initialize_request)}") 56 | process.stdin.write(json.dumps(initialize_request) + "\n") 57 | process.stdin.flush() 58 | 59 | # Read initialize response with timeout 60 | start_time = time.time() 61 | timeout = 5 62 | initialize_response = None 63 | 64 | print("Waiting for initialize response...") 65 | while time.time() - start_time < timeout: 66 | response_line = process.stdout.readline().strip() 67 | if response_line: 68 | print(f"Got response: {response_line}") 69 | try: 70 | initialize_response = json.loads(response_line) 71 | break 72 | except json.JSONDecodeError as e: 73 | print(f"Error parsing response: {e}") 74 | time.sleep(0.1) 75 | 76 | if not initialize_response: 77 | print("❌ Timeout waiting for initialize response") 78 | return 79 | 80 | print(f"✅ Initialize response received: {initialize_response.get('result', {}).get('serverInfo', {}).get('name', 'unknown')}") 81 | 82 | # Send initialized notification with correct format 83 | initialized_notification = { 84 | "jsonrpc": "2.0", 85 | "method": "notifications/initialized", 86 | "params": {} 87 | } 88 | print(f"Sending initialized notification: {json.dumps(initialized_notification)}") 89 | process.stdin.write(json.dumps(initialized_notification) + "\n") 90 | process.stdin.flush() 91 | 92 | # Send query request - intentionally simple query that works with empty memory connector 93 | query_request = { 94 | "jsonrpc": "2.0", 95 | "id": 2, 96 | "method": "tools/call", 97 | "params": { 98 | "name": "execute_query", 99 | "arguments": { 100 | "sql": "SELECT 'empty_as_fuck' AS status", 101 | "catalog": "memory" 102 | } 103 | } 104 | } 105 | print(f"Sending query request: {json.dumps(query_request)}") 106 | process.stdin.write(json.dumps(query_request) + "\n") 107 | process.stdin.flush() 108 | 109 | # Read query response with timeout 110 | start_time = time.time() 111 | query_response = None 112 | 113 | print("Waiting for query response...") 114 | while time.time() - start_time < timeout: 115 | response_line = process.stdout.readline().strip() 116 | if response_line: 117 | print(f"Got response: {response_line}") 118 | try: 119 | query_response = json.loads(response_line) 120 | break 121 | except json.JSONDecodeError as e: 122 | print(f"Error parsing response: {e}") 123 | time.sleep(0.1) 124 | 125 | if not query_response: 126 | print("❌ Timeout waiting for query response") 127 | return 128 | 129 | print("\n🔍 QUERY RESULTS:") 130 | 131 | if "error" in query_response: 132 | print(f"❌ Error: {query_response['error']}") 133 | else: 134 | result = query_response.get('result', {}) 135 | print(f"Query ID: {result.get('query_id', 'unknown')}") 136 | print(f"Columns: {result.get('columns', [])}") 137 | print(f"Row count: {result.get('row_count', 0)}") 138 | print(f"Preview rows: {json.dumps(result.get('preview_rows', []), indent=2)}") 139 | 140 | except Exception as e: 141 | print(f"❌ Exception: {e}") 142 | finally: 143 | # Properly terminate 144 | print("\n👋 Test completed. Terminating server process...") 145 | process.terminate() 146 | try: 147 | process.wait(timeout=2) 148 | except subprocess.TimeoutExpired: 149 | process.kill() 150 | 151 | return query_response 152 | 153 | if __name__ == "__main__": 154 | run_quick_query() ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Pytest configuration for the Trino MCP server tests. 3 | """ 4 | 5 | import os 6 | import time 7 | import json 8 | import pytest 9 | import requests 10 | import subprocess 11 | import signal 12 | from typing import Dict, Any, Iterator, Tuple 13 | 14 | # Define constants 15 | TEST_SERVER_PORT = 7000 # Using port 7000 to avoid ALL conflicts with existing containers 16 | TEST_SERVER_URL = f"http://localhost:{TEST_SERVER_PORT}" 17 | TRINO_HOST = os.environ.get("TEST_TRINO_HOST", "localhost") 18 | TRINO_PORT = int(os.environ.get("TEST_TRINO_PORT", "9095")) 19 | TRINO_USER = os.environ.get("TEST_TRINO_USER", "trino") 20 | 21 | 22 | class TrinoMCPTestServer: 23 | """Helper class to manage a test instance of the Trino MCP server.""" 24 | 25 | def __init__(self, port: int = TEST_SERVER_PORT): 26 | self.port = port 27 | self.process = None 28 | 29 | def start(self) -> None: 30 | """Start the server process.""" 31 | cmd = [ 32 | "python", "-m", "trino_mcp.server", 33 | "--transport", "sse", 34 | "--port", str(self.port), 35 | "--trino-host", TRINO_HOST, 36 | "--trino-port", str(TRINO_PORT), 37 | "--trino-user", TRINO_USER, 38 | "--trino-catalog", "memory", 39 | "--debug" 40 | ] 41 | 42 | env = os.environ.copy() 43 | env["PYTHONPATH"] = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 44 | 45 | self.process = subprocess.Popen( 46 | cmd, 47 | env=env, 48 | stdout=subprocess.PIPE, 49 | stderr=subprocess.PIPE, 50 | text=True 51 | ) 52 | 53 | # Wait for server to start 54 | self._wait_for_server() 55 | 56 | def stop(self) -> None: 57 | """Stop the server process.""" 58 | if self.process: 59 | self.process.send_signal(signal.SIGINT) 60 | self.process.wait() 61 | self.process = None 62 | 63 | def _wait_for_server(self, max_retries: int = 10, retry_interval: float = 0.5) -> None: 64 | """Wait for the server to become available.""" 65 | for _ in range(max_retries): 66 | try: 67 | response = requests.get(f"{TEST_SERVER_URL}/mcp") 68 | if response.status_code == 200: 69 | return 70 | except requests.exceptions.ConnectionError: 71 | pass 72 | 73 | time.sleep(retry_interval) 74 | 75 | raise TimeoutError(f"Server did not start within {max_retries * retry_interval} seconds") 76 | 77 | 78 | def check_trino_available() -> bool: 79 | """Check if Trino server is available for testing.""" 80 | try: 81 | response = requests.get(f"http://{TRINO_HOST}:{TRINO_PORT}/v1/info") 82 | return response.status_code == 200 83 | except requests.exceptions.ConnectionError: 84 | return False 85 | 86 | 87 | class MCPClient: 88 | """Simple MCP client for testing.""" 89 | 90 | def __init__(self, base_url: str = TEST_SERVER_URL): 91 | self.base_url = base_url 92 | self.next_id = 1 93 | self.initialized = False 94 | 95 | def initialize(self) -> Dict[str, Any]: 96 | """Initialize the MCP session.""" 97 | if self.initialized: 98 | return {"already_initialized": True} 99 | 100 | response = self._send_request("initialize", { 101 | "capabilities": {} 102 | }) 103 | 104 | self.initialized = True 105 | return response 106 | 107 | def list_tools(self) -> Dict[str, Any]: 108 | """List available tools.""" 109 | return self._send_request("tools/list") 110 | 111 | def list_resources(self, source: str = None, path: str = None) -> Dict[str, Any]: 112 | """List resources.""" 113 | params = {} 114 | if source: 115 | params["source"] = source 116 | if path: 117 | params["path"] = path 118 | 119 | return self._send_request("resources/list", params) 120 | 121 | def get_resource(self, source: str, path: str) -> Dict[str, Any]: 122 | """Get a specific resource.""" 123 | return self._send_request("resources/get", { 124 | "source": source, 125 | "path": path 126 | }) 127 | 128 | def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: 129 | """Call a tool with the given arguments.""" 130 | return self._send_request("tools/call", { 131 | "name": name, 132 | "arguments": arguments 133 | }) 134 | 135 | def shutdown(self) -> Dict[str, Any]: 136 | """Shutdown the MCP session.""" 137 | response = self._send_request("shutdown") 138 | self.initialized = False 139 | return response 140 | 141 | def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: 142 | """Send a JSON-RPC request to the server.""" 143 | request = { 144 | "jsonrpc": "2.0", 145 | "id": self.next_id, 146 | "method": method 147 | } 148 | 149 | if params is not None: 150 | request["params"] = params 151 | 152 | self.next_id += 1 153 | 154 | response = requests.post( 155 | f"{self.base_url}/mcp/message", 156 | json=request 157 | ) 158 | 159 | if response.status_code != 200: 160 | raise Exception(f"Request failed with status {response.status_code}: {response.text}") 161 | 162 | return response.json() 163 | 164 | 165 | @pytest.fixture(scope="session") 166 | def trino_available() -> bool: 167 | """Check if Trino is available.""" 168 | available = check_trino_available() 169 | if not available: 170 | pytest.skip("Trino server is not available for testing") 171 | return available 172 | 173 | 174 | @pytest.fixture(scope="session") 175 | def mcp_server(trino_available) -> Iterator[None]: 176 | """ 177 | Start a test instance of the Trino MCP server for the test session. 178 | 179 | Args: 180 | trino_available: Fixture to ensure Trino is available. 181 | 182 | Yields: 183 | None 184 | """ 185 | server = TrinoMCPTestServer() 186 | try: 187 | server.start() 188 | yield 189 | finally: 190 | server.stop() 191 | 192 | 193 | @pytest.fixture 194 | def mcp_client(mcp_server) -> Iterator[MCPClient]: 195 | """ 196 | Create a test MCP client connected to the test server. 197 | 198 | Args: 199 | mcp_server: The server fixture. 200 | 201 | Yields: 202 | MCPClient: An initialized MCP client. 203 | """ 204 | client = MCPClient() 205 | client.initialize() 206 | try: 207 | yield client 208 | finally: 209 | try: 210 | client.shutdown() 211 | except: 212 | pass # Ignore errors during shutdown ``` -------------------------------------------------------------------------------- /src/trino_mcp/tools/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | MCP tools for executing operations on Trino. 3 | """ 4 | from dataclasses import dataclass 5 | from typing import Any, Dict, List, Optional, Union 6 | 7 | from loguru import logger 8 | from mcp.server.fastmcp import Context, FastMCP 9 | 10 | from trino_mcp.trino_client import TrinoClient 11 | 12 | 13 | def register_trino_tools(mcp: FastMCP, client: TrinoClient) -> None: 14 | """ 15 | Register Trino tools with the MCP server. 16 | 17 | Args: 18 | mcp: The MCP server instance. 19 | client: The Trino client instance. 20 | """ 21 | 22 | @mcp.tool() 23 | def execute_query( 24 | sql: str, 25 | catalog: Optional[str] = None, 26 | schema: Optional[str] = None 27 | ) -> Dict[str, Any]: 28 | """ 29 | Execute a SQL query against Trino. 30 | 31 | Args: 32 | sql: The SQL query to execute. 33 | catalog: Optional catalog name to use for the query. 34 | schema: Optional schema name to use for the query. 35 | 36 | Returns: 37 | Dict[str, Any]: Query results including metadata. 38 | """ 39 | logger.info(f"Executing query: {sql}") 40 | 41 | try: 42 | result = client.execute_query(sql, catalog, schema) 43 | 44 | # Format the result in a structured way 45 | formatted_result = { 46 | "query_id": result.query_id, 47 | "columns": result.columns, 48 | "row_count": result.row_count, 49 | "query_time_ms": result.query_time_ms 50 | } 51 | 52 | # Add preview of results (first 20 rows) 53 | preview_rows = [] 54 | max_preview_rows = min(20, len(result.rows)) 55 | 56 | for i in range(max_preview_rows): 57 | row_dict = {} 58 | for j, col in enumerate(result.columns): 59 | row_dict[col] = result.rows[i][j] 60 | preview_rows.append(row_dict) 61 | 62 | formatted_result["preview_rows"] = preview_rows 63 | 64 | # Include a resource path for full results 65 | formatted_result["resource_path"] = f"trino://query/{result.query_id}" 66 | 67 | return formatted_result 68 | 69 | except Exception as e: 70 | error_msg = str(e) 71 | logger.error(f"Query execution failed: {error_msg}") 72 | return { 73 | "error": error_msg, 74 | "query": sql 75 | } 76 | 77 | @mcp.tool() 78 | def cancel_query(query_id: str) -> Dict[str, Any]: 79 | """ 80 | Cancel a running query. 81 | 82 | Args: 83 | query_id: ID of the query to cancel. 84 | 85 | Returns: 86 | Dict[str, Any]: Result of the cancellation operation. 87 | """ 88 | logger.info(f"Cancelling query: {query_id}") 89 | 90 | try: 91 | success = client.cancel_query(query_id) 92 | 93 | if success: 94 | return { 95 | "success": True, 96 | "message": f"Query {query_id} cancelled successfully" 97 | } 98 | else: 99 | return { 100 | "success": False, 101 | "message": f"Failed to cancel query {query_id}" 102 | } 103 | 104 | except Exception as e: 105 | error_msg = str(e) 106 | logger.error(f"Query cancellation failed: {error_msg}") 107 | return { 108 | "success": False, 109 | "error": error_msg, 110 | "query_id": query_id 111 | } 112 | 113 | @mcp.tool() 114 | def inspect_table( 115 | catalog: str, 116 | schema: str, 117 | table: str 118 | ) -> Dict[str, Any]: 119 | """ 120 | Get detailed metadata about a table. 121 | 122 | Args: 123 | catalog: Catalog name. 124 | schema: Schema name. 125 | table: Table name. 126 | 127 | Returns: 128 | Dict[str, Any]: Table metadata including columns, statistics, etc. 129 | """ 130 | logger.info(f"Inspecting table: {catalog}.{schema}.{table}") 131 | 132 | try: 133 | table_details = client.get_table_details(catalog, schema, table) 134 | 135 | # Try to get a row count (this might not work on all connectors) 136 | try: 137 | count_result = client.execute_query( 138 | f"SELECT count(*) AS row_count FROM {catalog}.{schema}.{table}" 139 | ) 140 | if count_result.rows and count_result.rows[0]: 141 | table_details["row_count"] = count_result.rows[0][0] 142 | except Exception as e: 143 | logger.warning(f"Failed to get row count: {e}") 144 | 145 | # Get additional info from the information_schema if available 146 | try: 147 | info_schema_query = f""" 148 | SELECT column_name, data_type, is_nullable, column_default 149 | FROM {catalog}.information_schema.columns 150 | WHERE table_catalog = '{catalog}' 151 | AND table_schema = '{schema}' 152 | AND table_name = '{table}' 153 | """ 154 | info_schema_result = client.execute_query(info_schema_query) 155 | 156 | enhanced_columns = [] 157 | for col in table_details["columns"]: 158 | enhanced_col = col.copy() 159 | 160 | # Find matching info_schema row 161 | for row in info_schema_result.rows: 162 | if row[0] == col["name"]: 163 | enhanced_col["data_type"] = row[1] 164 | enhanced_col["is_nullable"] = row[2] 165 | enhanced_col["default"] = row[3] 166 | break 167 | 168 | enhanced_columns.append(enhanced_col) 169 | 170 | table_details["columns"] = enhanced_columns 171 | except Exception as e: 172 | logger.warning(f"Failed to get column details from information_schema: {e}") 173 | 174 | return table_details 175 | 176 | except Exception as e: 177 | error_msg = str(e) 178 | logger.error(f"Table inspection failed: {error_msg}") 179 | return { 180 | "error": error_msg, 181 | "catalog": catalog, 182 | "schema": schema, 183 | "table": table 184 | } 185 | ``` -------------------------------------------------------------------------------- /llm_query_trino.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Simple wrapper script for LLMs to query Trino through MCP. 4 | This script handles all the MCP protocol complexity, so the LLM only needs to focus on SQL. 5 | 6 | Usage: 7 | python llm_query_trino.py "SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5" 8 | """ 9 | import json 10 | import subprocess 11 | import sys 12 | import time 13 | from typing import Dict, Any, List, Optional 14 | 15 | # Default configurations - modify as needed 16 | DEFAULT_CATALOG = "memory" 17 | DEFAULT_SCHEMA = "bullshit" 18 | 19 | def query_trino(sql_query: str, catalog: str = DEFAULT_CATALOG, schema: Optional[str] = DEFAULT_SCHEMA) -> Dict[str, Any]: 20 | """ 21 | Run a SQL query against Trino through MCP and return the results. 22 | 23 | Args: 24 | sql_query: The SQL query to execute 25 | catalog: Catalog name (default: memory) 26 | schema: Schema name (default: bullshit) 27 | 28 | Returns: 29 | Dictionary with query results or error 30 | """ 31 | print(f"\n🔍 Running query via Trino MCP:\n{sql_query}") 32 | 33 | # Start the MCP server with STDIO transport 34 | cmd = [ 35 | "docker", "exec", "-i", "trino_mcp_trino-mcp_1", 36 | "python", "-m", "trino_mcp.server", 37 | "--transport", "stdio", 38 | "--debug", 39 | "--trino-host", "trino", 40 | "--trino-port", "8080", 41 | "--trino-user", "trino", 42 | "--trino-catalog", catalog 43 | ] 44 | 45 | try: 46 | process = subprocess.Popen( 47 | cmd, 48 | stdin=subprocess.PIPE, 49 | stdout=subprocess.PIPE, 50 | stderr=subprocess.PIPE, 51 | text=True, 52 | bufsize=1 53 | ) 54 | 55 | # Wait for MCP server to start 56 | time.sleep(2) 57 | 58 | # Helper function to send requests 59 | def send_request(request, expect_response=True): 60 | request_str = json.dumps(request) + "\n" 61 | process.stdin.write(request_str) 62 | process.stdin.flush() 63 | 64 | if not expect_response: 65 | return None 66 | 67 | response_str = process.stdout.readline() 68 | if response_str: 69 | return json.loads(response_str) 70 | return None 71 | 72 | # Step 1: Initialize MCP 73 | init_request = { 74 | "jsonrpc": "2.0", 75 | "id": 1, 76 | "method": "initialize", 77 | "params": { 78 | "protocolVersion": "2024-11-05", 79 | "clientInfo": { 80 | "name": "llm-query-client", 81 | "version": "1.0.0" 82 | }, 83 | "capabilities": { 84 | "tools": True 85 | } 86 | } 87 | } 88 | 89 | init_response = send_request(init_request) 90 | if not init_response: 91 | return {"error": "Failed to initialize MCP"} 92 | 93 | # Step 2: Send initialized notification 94 | init_notification = { 95 | "jsonrpc": "2.0", 96 | "method": "notifications/initialized", 97 | "params": {} 98 | } 99 | 100 | send_request(init_notification, expect_response=False) 101 | 102 | # Step 3: Execute query 103 | query_args = {"sql": sql_query, "catalog": catalog} 104 | if schema: 105 | query_args["schema"] = schema 106 | 107 | query_request = { 108 | "jsonrpc": "2.0", 109 | "id": 2, 110 | "method": "tools/call", 111 | "params": { 112 | "name": "execute_query", 113 | "arguments": query_args 114 | } 115 | } 116 | 117 | query_response = send_request(query_request) 118 | if not query_response: 119 | return {"error": "No response received for query"} 120 | 121 | if "error" in query_response: 122 | return {"error": query_response["error"]} 123 | 124 | # Step 4: Parse the content 125 | try: 126 | # Extract nested result content 127 | content_text = query_response.get("result", {}).get("content", [{}])[0].get("text", "{}") 128 | result_data = json.loads(content_text) 129 | 130 | # Clean up the results for easier consumption 131 | return { 132 | "success": True, 133 | "query_id": result_data.get("query_id", "unknown"), 134 | "columns": result_data.get("columns", []), 135 | "row_count": result_data.get("row_count", 0), 136 | "rows": result_data.get("preview_rows", []), 137 | "execution_time_ms": result_data.get("query_time_ms", 0) 138 | } 139 | except Exception as e: 140 | return { 141 | "error": f"Error parsing results: {str(e)}", 142 | "raw_response": query_response 143 | } 144 | 145 | except Exception as e: 146 | return {"error": f"Error: {str(e)}"} 147 | finally: 148 | if 'process' in locals() and process.poll() is None: 149 | process.terminate() 150 | try: 151 | process.wait(timeout=5) 152 | except subprocess.TimeoutExpired: 153 | process.kill() 154 | 155 | def format_results(results: Dict[str, Any]) -> str: 156 | """Format query results for display""" 157 | if "error" in results: 158 | return f"❌ Error: {results['error']}" 159 | 160 | if not results.get("success"): 161 | return f"❌ Query failed: {results}" 162 | 163 | output = [ 164 | f"✅ Query executed successfully!", 165 | f"📊 Rows: {results['row_count']}", 166 | f"⏱️ Execution Time: {results['execution_time_ms']:.2f}ms", 167 | f"\nColumns: {', '.join(results['columns'])}", 168 | f"\nResults:" 169 | ] 170 | 171 | # Table header 172 | if results["columns"]: 173 | header = " | ".join(f"{col.upper()}" for col in results["columns"]) 174 | output.append(header) 175 | output.append("-" * len(header)) 176 | 177 | # Table rows 178 | for row in results["rows"]: 179 | values = [] 180 | for col in results["columns"]: 181 | values.append(str(row.get(col, "NULL"))) 182 | output.append(" | ".join(values)) 183 | 184 | return "\n".join(output) 185 | 186 | def main(): 187 | """Run a query from command line arguments""" 188 | if len(sys.argv) < 2: 189 | print("Usage: python llm_query_trino.py 'SELECT * FROM memory.bullshit.real_bullshit_data LIMIT 5'") 190 | sys.exit(1) 191 | 192 | # Get the SQL query from command line 193 | sql_query = sys.argv[1] 194 | 195 | # Parse optional catalog and schema 196 | catalog = DEFAULT_CATALOG 197 | schema = DEFAULT_SCHEMA 198 | 199 | if len(sys.argv) > 2: 200 | catalog = sys.argv[2] 201 | 202 | if len(sys.argv) > 3: 203 | schema = sys.argv[3] 204 | 205 | # Execute the query 206 | results = query_trino(sql_query, catalog, schema) 207 | 208 | # Print formatted results 209 | print(format_results(results)) 210 | 211 | if __name__ == "__main__": 212 | main() ``` -------------------------------------------------------------------------------- /load_bullshit_data.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Quick script to load our bullshit data directly into Trino using the memory connector 4 | instead of relying on the Hive metastore which seems to be having issues. 5 | """ 6 | import pandas as pd 7 | import trino 8 | import time 9 | import sys 10 | 11 | # Configure Trino connection 12 | TRINO_HOST = "localhost" 13 | TRINO_PORT = 9095 14 | TRINO_USER = "trino" 15 | TRINO_CATALOG = "memory" 16 | 17 | def main(): 18 | print("🚀 Loading bullshit data into Trino...") 19 | 20 | # Load the parquet data 21 | try: 22 | print("Reading the bullshit data...") 23 | df = pd.read_parquet('data/bullshit_data.parquet') 24 | print(f"Loaded {len(df)} rows of bullshit data") 25 | except Exception as e: 26 | print(f"❌ Failed to load parquet data: {e}") 27 | sys.exit(1) 28 | 29 | # Connect to Trino 30 | print(f"Connecting to Trino at {TRINO_HOST}:{TRINO_PORT}...") 31 | 32 | # Try to connect with retries 33 | max_attempts = 10 34 | for attempt in range(1, max_attempts + 1): 35 | try: 36 | conn = trino.dbapi.connect( 37 | host=TRINO_HOST, 38 | port=TRINO_PORT, 39 | user=TRINO_USER, 40 | catalog=TRINO_CATALOG 41 | ) 42 | print("✅ Connected to Trino") 43 | break 44 | except Exception as e: 45 | print(f"Attempt {attempt}/{max_attempts} - Failed to connect: {e}") 46 | if attempt == max_attempts: 47 | print("❌ Could not connect to Trino after multiple attempts") 48 | sys.exit(1) 49 | time.sleep(2) 50 | 51 | # Create cursor 52 | cursor = conn.cursor() 53 | 54 | try: 55 | # Create a schema 56 | print("Creating bullshit schema...") 57 | cursor.execute("CREATE SCHEMA IF NOT EXISTS memory.bullshit") 58 | 59 | # Drop tables if they exist (memory connector doesn't support CREATE OR REPLACE) 60 | print("Dropping existing tables if they exist...") 61 | try: 62 | cursor.execute("DROP TABLE IF EXISTS memory.bullshit.bullshit_data") 63 | cursor.execute("DROP TABLE IF EXISTS memory.bullshit.real_bullshit_data") 64 | cursor.execute("DROP VIEW IF EXISTS memory.bullshit.bullshit_summary") 65 | except Exception as e: 66 | print(f"Warning during table drops: {e}") 67 | 68 | # Create a sample table for our bullshit data 69 | print("Creating sample bullshit_data table...") 70 | cursor.execute(""" 71 | CREATE TABLE memory.bullshit.bullshit_data ( 72 | id BIGINT, 73 | job_title VARCHAR, 74 | name VARCHAR, 75 | salary BIGINT, 76 | bullshit_factor INTEGER, 77 | boolean_flag BOOLEAN, 78 | enum_field VARCHAR 79 | ) 80 | """) 81 | 82 | # Insert sample data 83 | print("Inserting sample data...") 84 | cursor.execute(""" 85 | INSERT INTO memory.bullshit.bullshit_data VALUES 86 | (1, 'CEO', 'Sample Data', 250000, 10, TRUE, 'Option A'), 87 | (2, 'CTO', 'More Examples', 225000, 8, TRUE, 'Option B'), 88 | (3, 'Developer', 'Testing Data', 120000, 5, FALSE, 'Option C') 89 | """) 90 | 91 | # Now we'll load real data from our dataframe 92 | # For memory connector, we need to create a new table with the data 93 | print("Creating real_bullshit_data table with our generated data...") 94 | 95 | # Take a subset of columns for simplicity 96 | cols = ['id', 'name', 'job_title', 'salary', 'bullshit_factor', 'bullshit_statement', 'company'] 97 | df_subset = df[cols].head(100) # Take just 100 rows to keep it manageable 98 | 99 | # Handle NULL values - replace with empty strings for strings and 0 for numbers 100 | df_subset = df_subset.fillna({ 101 | 'name': 'Anonymous', 102 | 'job_title': 'Unknown', 103 | 'bullshit_statement': 'No statement', 104 | 'company': 'Unknown Co' 105 | }) 106 | df_subset = df_subset.fillna(0) 107 | 108 | # Create the table structure 109 | cursor.execute(""" 110 | CREATE TABLE memory.bullshit.real_bullshit_data ( 111 | id BIGINT, 112 | job_title VARCHAR, 113 | name VARCHAR, 114 | salary DOUBLE, 115 | bullshit_factor DOUBLE, 116 | bullshit_statement VARCHAR, 117 | company VARCHAR 118 | ) 119 | """) 120 | 121 | # Insert data in batches to avoid overly long SQL statements 122 | batch_size = 10 123 | total_batches = (len(df_subset) + batch_size - 1) // batch_size # Ceiling division 124 | 125 | print(f"Inserting {len(df_subset)} rows in {total_batches} batches...") 126 | 127 | for batch_num in range(total_batches): 128 | start_idx = batch_num * batch_size 129 | end_idx = min(start_idx + batch_size, len(df_subset)) 130 | batch = df_subset.iloc[start_idx:end_idx] 131 | 132 | # Create VALUES part of SQL statement for this batch 133 | values_list = [] 134 | for _, row in batch.iterrows(): 135 | # Clean string values to prevent SQL injection/syntax errors 136 | job_title = str(row['job_title']).replace("'", "''") 137 | name = str(row['name']).replace("'", "''") 138 | statement = str(row['bullshit_statement']).replace("'", "''") 139 | company = str(row['company']).replace("'", "''") 140 | 141 | values_str = f"({row['id']}, '{job_title}', '{name}', {row['salary']}, {row['bullshit_factor']}, '{statement}', '{company}')" 142 | values_list.append(values_str) 143 | 144 | values_sql = ", ".join(values_list) 145 | 146 | # Insert batch 147 | insert_sql = f""" 148 | INSERT INTO memory.bullshit.real_bullshit_data VALUES 149 | {values_sql} 150 | """ 151 | cursor.execute(insert_sql) 152 | 153 | print(f"Batch {batch_num+1}/{total_batches} inserted.") 154 | 155 | # Create a summary view 156 | print("Creating summary view...") 157 | cursor.execute(""" 158 | CREATE VIEW memory.bullshit.bullshit_summary AS 159 | SELECT 160 | job_title, 161 | COUNT(*) as count, 162 | AVG(salary) as avg_salary, 163 | AVG(bullshit_factor) as avg_bs_factor 164 | FROM 165 | memory.bullshit.real_bullshit_data 166 | GROUP BY 167 | job_title 168 | """) 169 | 170 | # Query to verify 171 | print("\nVerifying data with a query:") 172 | cursor.execute("SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC") 173 | 174 | # Print results 175 | columns = [desc[0] for desc in cursor.description] 176 | print("\n" + " | ".join(columns)) 177 | print("-" * 80) 178 | 179 | rows = cursor.fetchall() 180 | for row in rows: 181 | print(" | ".join(str(cell) for cell in row)) 182 | 183 | print(f"\n✅ Successfully loaded {len(df_subset)} rows of bullshit data into Trino!") 184 | print("You can now query it with: SELECT * FROM memory.bullshit.real_bullshit_data") 185 | 186 | except Exception as e: 187 | print(f"❌ Error: {e}") 188 | finally: 189 | cursor.close() 190 | conn.close() 191 | print("Connection closed") 192 | 193 | if __name__ == "__main__": 194 | main() ``` -------------------------------------------------------------------------------- /scripts/test_stdio_trino.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Test script for the Trino MCP protocol using STDIO transport. 4 | This avoids the SSE transport issues we're encountering. 5 | """ 6 | import json 7 | import os 8 | import subprocess 9 | import sys 10 | import time 11 | from typing import Dict, Any, Optional, List 12 | 13 | def test_mcp_stdio(): 14 | """ 15 | Test the MCP protocol using a subprocess with STDIO transport. 16 | """ 17 | print("🚀 Testing Trino MCP with STDIO transport...") 18 | 19 | # Start the MCP server in a subprocess with STDIO transport 20 | print("Starting MCP server with STDIO transport...") 21 | mcp_server_cmd = [ 22 | "docker", "exec", "-it", "trino_mcp_trino-mcp_1", 23 | "python", "-m", "trino_mcp.server", "--transport", "stdio" 24 | ] 25 | 26 | process = subprocess.Popen( 27 | mcp_server_cmd, 28 | stdin=subprocess.PIPE, 29 | stdout=subprocess.PIPE, 30 | stderr=subprocess.PIPE, 31 | text=True, 32 | bufsize=1 # Line buffered 33 | ) 34 | 35 | # Helper function to send a request and get a response 36 | def send_request(request: Dict[str, Any]) -> Optional[Dict[str, Any]]: 37 | request_str = json.dumps(request) + "\n" 38 | print(f"\n📤 Sending: {request_str.strip()}") 39 | process.stdin.write(request_str) 40 | process.stdin.flush() 41 | 42 | # Read response with timeout 43 | start_time = time.time() 44 | timeout = 10 # 10 seconds timeout 45 | response_str = None 46 | 47 | while time.time() - start_time < timeout: 48 | response_str = process.stdout.readline().strip() 49 | if response_str: 50 | print(f"📩 Received: {response_str}") 51 | try: 52 | return json.loads(response_str) 53 | except json.JSONDecodeError as e: 54 | print(f"❌ Error parsing response as JSON: {e}") 55 | time.sleep(0.1) 56 | 57 | print("⏱️ Timeout waiting for response") 58 | return None 59 | 60 | # Read any startup output to clear the buffer 61 | print("Waiting for server startup...") 62 | time.sleep(2) # Give the server time to start up 63 | 64 | # Initialize the protocol 65 | print("\n=== Step 1: Initialize MCP ===") 66 | initialize_request = { 67 | "jsonrpc": "2.0", 68 | "id": 1, 69 | "method": "initialize", 70 | "params": { 71 | "protocolVersion": "2024-11-05", 72 | "clientInfo": { 73 | "name": "trino-mcp-test-client", 74 | "version": "1.0.0" 75 | }, 76 | "capabilities": { 77 | "tools": True, 78 | "resources": { 79 | "supportedSources": ["trino://catalog"] 80 | } 81 | } 82 | } 83 | } 84 | 85 | initialize_response = send_request(initialize_request) 86 | if not initialize_response: 87 | print("❌ Failed to initialize MCP") 88 | process.terminate() 89 | return 90 | 91 | print("✅ MCP initialized successfully") 92 | print(f"Server info: {json.dumps(initialize_response.get('result', {}).get('serverInfo', {}), indent=2)}") 93 | 94 | # Send initialized notification 95 | print("\n=== Step 2: Send initialized notification ===") 96 | initialized_notification = { 97 | "jsonrpc": "2.0", 98 | "method": "initialized" 99 | } 100 | 101 | _ = send_request(initialized_notification) 102 | print("✅ Initialized notification sent") 103 | 104 | # Get available tools 105 | print("\n=== Step 3: List available tools ===") 106 | tools_request = { 107 | "jsonrpc": "2.0", 108 | "id": 2, 109 | "method": "tools/list" 110 | } 111 | 112 | tools_response = send_request(tools_request) 113 | if not tools_response or "result" not in tools_response: 114 | print("❌ Failed to get tools list") 115 | process.terminate() 116 | return 117 | 118 | tools = tools_response.get("result", {}).get("tools", []) 119 | print(f"✅ Available tools: {len(tools)}") 120 | for tool in tools: 121 | print(f" - {tool.get('name')}: {tool.get('description')}") 122 | 123 | # Execute a simple query 124 | print("\n=== Step 4: Execute a query ===") 125 | query_request = { 126 | "jsonrpc": "2.0", 127 | "id": 3, 128 | "method": "tools/call", 129 | "params": { 130 | "name": "execute_query", 131 | "arguments": { 132 | "sql": "SELECT * FROM memory.bullshit.bullshit_data", 133 | "catalog": "memory" 134 | } 135 | } 136 | } 137 | 138 | query_response = send_request(query_request) 139 | if not query_response: 140 | print("❌ Failed to execute query") 141 | elif "error" in query_response: 142 | print(f"❌ Query error: {query_response.get('error')}") 143 | else: 144 | result = query_response.get("result", {}) 145 | row_count = result.get("row_count", 0) 146 | print(f"✅ Query executed successfully with {row_count} rows") 147 | 148 | # Print columns and preview rows 149 | print(f"Columns: {', '.join(result.get('columns', []))}") 150 | print("Preview rows:") 151 | for row in result.get("preview_rows", []): 152 | print(f" {row}") 153 | 154 | # Execute a summary query 155 | print("\n=== Step 5: Query the summary view ===") 156 | summary_request = { 157 | "jsonrpc": "2.0", 158 | "id": 4, 159 | "method": "tools/call", 160 | "params": { 161 | "name": "execute_query", 162 | "arguments": { 163 | "sql": "SELECT * FROM memory.bullshit.bullshit_summary ORDER BY count DESC", 164 | "catalog": "memory" 165 | } 166 | } 167 | } 168 | 169 | summary_response = send_request(summary_request) 170 | if not summary_response: 171 | print("❌ Failed to execute summary query") 172 | elif "error" in summary_response: 173 | print(f"❌ Summary query error: {summary_response.get('error')}") 174 | else: 175 | result = summary_response.get("result", {}) 176 | row_count = result.get("row_count", 0) 177 | print(f"✅ Summary query executed successfully with {row_count} rows") 178 | 179 | # Print columns and preview rows 180 | print(f"Columns: {', '.join(result.get('columns', []))}") 181 | print("Summary data:") 182 | for row in result.get("preview_rows", []): 183 | print(f" {row}") 184 | 185 | # List available resources 186 | print("\n=== Step 6: List available resources ===") 187 | resources_request = { 188 | "jsonrpc": "2.0", 189 | "id": 5, 190 | "method": "resources/list", 191 | "params": { 192 | "source": "trino://catalog" 193 | } 194 | } 195 | 196 | resources_response = send_request(resources_request) 197 | if not resources_response or "result" not in resources_response: 198 | print("❌ Failed to get resources list") 199 | else: 200 | resources = resources_response.get("result", {}).get("resources", []) 201 | print(f"✅ Available resources: {len(resources)}") 202 | for resource in resources: 203 | print(f" - {resource}") 204 | 205 | # Clean up the process 206 | print("\n=== Finishing test ===") 207 | process.terminate() 208 | try: 209 | process.wait(timeout=5) 210 | print("✅ MCP server process terminated") 211 | except subprocess.TimeoutExpired: 212 | print("⚠️ Had to force kill the MCP server process") 213 | process.kill() 214 | 215 | # Check for errors in stderr 216 | stderr = process.stderr.read() 217 | if stderr: 218 | print("\n⚠️ Server stderr output:") 219 | print(stderr) 220 | 221 | print("\n🏁 Test completed!") 222 | 223 | if __name__ == "__main__": 224 | test_mcp_stdio() ``` -------------------------------------------------------------------------------- /scripts/test_fixed_client.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Fixed test script for the MCP server that uses the correct notification format. 4 | This should actually work with MCP 1.3.0. 5 | """ 6 | import json 7 | import requests 8 | import sys 9 | import time 10 | import sseclient 11 | from rich.console import Console 12 | 13 | console = Console() 14 | 15 | def test_mcp(): 16 | """ 17 | Test the MCP server with proper message formats. 18 | Fixes the notification format to work with MCP 1.3.0. 19 | """ 20 | console.print("[bold green]🚀 Starting MCP client test with fixed notification format[/]") 21 | 22 | # Connect to SSE endpoint 23 | console.print("[bold blue]Connecting to SSE endpoint...[/]") 24 | headers = {"Accept": "text/event-stream"} 25 | sse_response = requests.get("http://localhost:9096/sse", headers=headers, stream=True) 26 | client = sseclient.SSEClient(sse_response) 27 | 28 | # Get the messages URL from the first event 29 | messages_url = None 30 | session_id = None 31 | 32 | for event in client.events(): 33 | console.print(f"[cyan]SSE event:[/] {event.event}") 34 | console.print(f"[cyan]SSE data:[/] {event.data}") 35 | 36 | if event.event == "endpoint": 37 | messages_url = f"http://localhost:9096{event.data}" 38 | # Extract session ID from URL 39 | if "session_id=" in event.data: 40 | session_id = event.data.split("session_id=")[1] 41 | console.print(f"[green]Got messages URL:[/] {messages_url}") 42 | console.print(f"[green]Session ID:[/] {session_id}") 43 | break 44 | 45 | if not messages_url: 46 | console.print("[bold red]Failed to get messages URL from SSE[/]") 47 | return 48 | 49 | # Now we have the messages URL, send initialize request 50 | console.print(f"\n[bold blue]Sending initialize request to {messages_url}[/]") 51 | initialize_request = { 52 | "jsonrpc": "2.0", 53 | "id": 1, 54 | "method": "initialize", 55 | "params": { 56 | "protocolVersion": "2024-11-05", 57 | "clientInfo": { 58 | "name": "fixed-test-client", 59 | "version": "1.0.0" 60 | }, 61 | "capabilities": { 62 | "tools": True, 63 | "resources": { 64 | "supportedSources": ["trino://catalog"] 65 | } 66 | } 67 | } 68 | } 69 | 70 | try: 71 | response = requests.post(messages_url, json=initialize_request) 72 | console.print(f"[cyan]Status code:[/] {response.status_code}") 73 | console.print(f"[cyan]Response:[/] {response.text}") 74 | 75 | # Continue listening for events to get the response 76 | console.print("\n[bold blue]Listening for response events...[/]") 77 | 78 | # Start a timeout counter 79 | start_time = time.time() 80 | timeout = 30 # 30 seconds timeout 81 | 82 | # Keep listening for events 83 | for event in client.events(): 84 | # Skip ping events 85 | if event.event == "ping": 86 | continue 87 | 88 | console.print(f"[magenta]Event type:[/] {event.event}") 89 | console.print(f"[magenta]Event data:[/] {event.data}") 90 | 91 | # If we get a message event, parse it 92 | if event.event == "message" and event.data: 93 | try: 94 | data = json.loads(event.data) 95 | console.print(f"[green]Parsed message:[/] {json.dumps(data, indent=2)}") 96 | 97 | # Check if this is a response to our initialize request 98 | if "id" in data and data["id"] == 1: 99 | # Send an initialization notification with CORRECT FORMAT 100 | console.print("\n[bold blue]Sending initialized notification with correct format...[/]") 101 | initialized_notification = { 102 | "jsonrpc": "2.0", 103 | "method": "notifications/initialized", # FIXED: correct method name 104 | "params": {} # FIXED: added required params 105 | } 106 | response = requests.post(messages_url, json=initialized_notification) 107 | console.print(f"[cyan]Status code:[/] {response.status_code}") 108 | console.print(f"[cyan]Response:[/] {response.text}") 109 | 110 | # Now send a tools/list request 111 | console.print("\n[bold blue]Sending tools/list request...[/]") 112 | tools_request = { 113 | "jsonrpc": "2.0", 114 | "id": 2, 115 | "method": "tools/list" 116 | } 117 | response = requests.post(messages_url, json=tools_request) 118 | console.print(f"[cyan]Status code:[/] {response.status_code}") 119 | console.print(f"[cyan]Response:[/] {response.text}") 120 | 121 | # Check if this is a response to our tools/list request 122 | if "id" in data and data["id"] == 2: 123 | # Now send a resources/list request for trino catalogs 124 | console.print("\n[bold blue]Sending resources/list request for trino catalogs...[/]") 125 | resources_request = { 126 | "jsonrpc": "2.0", 127 | "id": 3, 128 | "method": "resources/list", 129 | "params": { 130 | "source": "trino://catalog" 131 | } 132 | } 133 | response = requests.post(messages_url, json=resources_request) 134 | console.print(f"[cyan]Status code:[/] {response.status_code}") 135 | console.print(f"[cyan]Response:[/] {response.text}") 136 | 137 | # If we get the resource list, try to execute a query 138 | if "id" in data and data["id"] == 3: 139 | console.print("\n[bold green]🔥 Got resources! Now trying to execute a query...[/]") 140 | query_request = { 141 | "jsonrpc": "2.0", 142 | "id": 4, 143 | "method": "tools/call", 144 | "params": { 145 | "name": "execute_query", 146 | "arguments": { 147 | "sql": "SELECT 1 AS test_value, 'it works!' AS message", 148 | "catalog": "memory" 149 | } 150 | } 151 | } 152 | response = requests.post(messages_url, json=query_request) 153 | console.print(f"[cyan]Status code:[/] {response.status_code}") 154 | console.print(f"[cyan]Response:[/] {response.text}") 155 | 156 | except Exception as e: 157 | console.print(f"[bold red]Error parsing message:[/] {e}") 158 | 159 | # Check timeout 160 | if time.time() - start_time > timeout: 161 | console.print("[bold yellow]Timeout waiting for response[/]") 162 | break 163 | 164 | except KeyboardInterrupt: 165 | console.print("\n[bold yellow]Exiting...[/]") 166 | except Exception as e: 167 | console.print(f"[bold red]Error:[/] {e}") 168 | finally: 169 | # Close the SSE connection 170 | sse_response.close() 171 | console.print("[bold green]Test completed. Connection closed.[/]") 172 | 173 | if __name__ == "__main__": 174 | test_mcp() ``` -------------------------------------------------------------------------------- /tools/create_bullshit_data.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Create a bullshit parquet file full of random silly data for Trino to query. 4 | """ 5 | import os 6 | import pandas as pd 7 | import numpy as np 8 | from datetime import datetime, timedelta 9 | import random 10 | import string 11 | 12 | # Make this shit reproducible 13 | random.seed(42069) 14 | np.random.seed(42069) 15 | 16 | def random_company_name(): 17 | """Generate a ridiculous startup name.""" 18 | prefixes = ["Block", "Hash", "Crypto", "Data", "Quantum", "Neural", "Cloud", "Cyber", "Meta", "Digital", 19 | "AI", "ML", "Algo", "Bit", "Logic", "Hyper", "Ultra", "Deep", "Sync", "Tech"] 20 | suffixes = ["Chain", "Flow", "Mind", "Logic", "Base", "Scale", "Cube", "Stream", "Grid", "Verse", 21 | "Net", "Ware", "Hub", "Pulse", "Sense", "Node", "Edge", "Core", "Link", "Matrix"] 22 | 23 | return f"{random.choice(prefixes)}{random.choice(suffixes)}" 24 | 25 | def random_bullshit_job_title(): 26 | """Generate a bullshit job title.""" 27 | prefix = ["Chief", "Senior", "Lead", "Global", "Dynamic", "Principal", "Executive", "Head of", 28 | "Director of", "VP of", "Distinguished", "Advanced", "Master", "Innovation", "Transformation"] 29 | middle = ["Digital", "Data", "Blockchain", "AI", "Experience", "Product", "Solutions", "Technical", 30 | "Strategic", "Cloud", "Enterprise", "Creative", "Platform", "Innovation", "Disruption"] 31 | suffix = ["Officer", "Architect", "Evangelist", "Guru", "Ninja", "Rockstar", "Wizard", "Jedi", 32 | "Explorer", "Catalyst", "Visionary", "Storyteller", "Hacker", "Champion", "Designer"] 33 | 34 | return f"{random.choice(prefix)} {random.choice(middle)} {random.choice(suffix)}" 35 | 36 | def random_email(name): 37 | """Generate a random email based on a name.""" 38 | domains = ["gmail.com", "hotmail.com", "yahoo.com", "outlook.com", "icloud.com", 39 | "protonmail.com", "example.com", "bullshit.io", "fakeaf.dev", "notreal.net"] 40 | 41 | name_part = name.lower().replace(" ", "") 42 | return f"{name_part}{random.randint(1, 999)}@{random.choice(domains)}" 43 | 44 | def random_ip(): 45 | """Generate a random IP address.""" 46 | return f"{random.randint(1, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}" 47 | 48 | def random_name(): 49 | """Generate a random person name.""" 50 | first_names = ["James", "Mary", "John", "Patricia", "Robert", "Jennifer", "Michael", "Linda", 51 | "William", "Elizabeth", "David", "Susan", "Richard", "Jessica", "Joseph", "Sarah", 52 | "Thomas", "Karen", "Charles", "Nancy", "Skyler", "Jesse", "Walter", "Saul", "Mike"] 53 | 54 | last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", 55 | "Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson", 56 | "White", "Goodman", "Pinkman", "Fring", "Ehrmantraut", "Schrader", "Wexler"] 57 | 58 | return f"{random.choice(first_names)} {random.choice(last_names)}" 59 | 60 | def random_sentence(): 61 | """Generate a random bullshit sentence.""" 62 | subjects = ["Our company", "The team", "This product", "The algorithm", "Our platform", "The API", 63 | "Our solution", "The dashboard", "Our methodology", "The framework", "This breakthrough"] 64 | 65 | verbs = ["leverages", "utilizes", "implements", "optimizes", "integrates", "streamlines", "facilitates", 66 | "enables", "empowers", "revolutionizes", "disrupts", "transforms", "synergizes with"] 67 | 68 | adjectives = ["cutting-edge", "next-generation", "state-of-the-art", "innovative", "advanced", 69 | "robust", "scalable", "agile", "dynamic", "intuitive", "seamless", "bleeding-edge"] 70 | 71 | nouns = ["blockchain", "AI", "machine learning", "cloud computing", "big data", "IoT", "microservices", 72 | "neural networks", "quantum computing", "edge computing", "digital transformation", "DevOps"] 73 | 74 | benefits = ["increasing efficiency", "maximizing ROI", "driving growth", "boosting productivity", 75 | "enhancing performance", "reducing overhead", "optimizing workflows", "minimizing downtime", 76 | "accelerating innovation", "enabling scalability", "facilitating collaboration"] 77 | 78 | return f"{random.choice(subjects)} {random.choice(verbs)} {random.choice(adjectives)} {random.choice(nouns)} for {random.choice(benefits)}." 79 | 80 | def generate_bullshit_data(num_rows=1000): 81 | """Generate a DataFrame of complete bullshit data.""" 82 | print(f"Generating {num_rows} rows of absolute bullshit...") 83 | 84 | # Generate random data 85 | data = { 86 | "id": list(range(1, num_rows + 1)), 87 | "name": [random_name() for _ in range(num_rows)], 88 | "email": [], # Will fill after generating names 89 | "company": [random_company_name() for _ in range(num_rows)], 90 | "job_title": [random_bullshit_job_title() for _ in range(num_rows)], 91 | "salary": np.random.normal(150000, 50000, num_rows).astype(int), # Ridiculously high tech salaries 92 | "bullshit_factor": np.random.randint(1, 11, num_rows), # On a scale of 1-10 93 | "ip_address": [random_ip() for _ in range(num_rows)], 94 | "created_at": [(datetime.now() - timedelta(days=random.randint(0, 365 * 3))).strftime('%Y-%m-%d %H:%M:%S') for _ in range(num_rows)], 95 | "last_active": [(datetime.now() - timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d %H:%M:%S') for _ in range(num_rows)], 96 | "account_status": np.random.choice(['active', 'inactive', 'suspended', 'pending'], num_rows, p=[0.7, 0.1, 0.1, 0.1]), 97 | "login_count": np.random.randint(1, 1000, num_rows), 98 | "buzzword_quota": np.random.randint(5, 100, num_rows), 99 | "bullshit_statement": [random_sentence() for _ in range(num_rows)], 100 | "favorite_framework": np.random.choice(['React', 'Angular', 'Vue', 'Svelte', 'Django', 'Flask', 'Spring', 'Rails'], num_rows), 101 | "preferred_language": np.random.choice(['Python', 'JavaScript', 'Java', 'C#', 'Go', 'Rust', 'TypeScript', 'Ruby'], num_rows), 102 | "coffee_consumption": np.random.randint(1, 10, num_rows), # Cups per day 103 | "meeting_hours": np.random.randint(0, 40, num_rows), # Hours per week 104 | "actual_work_hours": np.random.randint(0, 40, num_rows), # Hours per week 105 | "bugs_created": np.random.randint(0, 100, num_rows), 106 | "bugs_fixed": [], # Will calculate after bugs_created 107 | "productivity_score": np.random.rand(num_rows) * 100, 108 | "gitlab_commits": np.random.negative_binomial(5, 0.5, num_rows), # Most people commit very little 109 | "stackoverflow_reputation": np.random.exponential(1000, num_rows).astype(int), 110 | "random_float": np.random.rand(num_rows) * 100, 111 | "boolean_flag": np.random.choice([True, False], num_rows), 112 | "enum_field": np.random.choice(['Option A', 'Option B', 'Option C', 'Option D'], num_rows), 113 | "null_percentage": np.random.rand(num_rows) * 100, 114 | } 115 | 116 | # Generate dependent fields 117 | for i in range(num_rows): 118 | # Email based on name 119 | data["email"].append(random_email(data["name"][i])) 120 | 121 | # Bugs fixed is some percentage of bugs created 122 | fix_rate = random.uniform(0.5, 1.2) # Sometimes they fix more bugs than they create! 123 | data["bugs_fixed"].append(int(data["bugs_created"][i] * fix_rate)) 124 | 125 | # Create DataFrame 126 | df = pd.DataFrame(data) 127 | 128 | # Add some NULL values for realism 129 | for col in df.columns: 130 | if col != 'id': # Keep id intact 131 | null_mask = np.random.random(num_rows) < 0.05 # 5% chance of NULL 132 | df.loc[null_mask, col] = None 133 | 134 | return df 135 | 136 | def main(): 137 | """Main function to create and save the bullshit data.""" 138 | # Create data directory if it doesn't exist 139 | data_dir = "data" 140 | os.makedirs(data_dir, exist_ok=True) 141 | 142 | # Generate bullshit data 143 | df = generate_bullshit_data(num_rows=10000) # 10,000 rows of pure nonsense 144 | 145 | # Save as parquet 146 | parquet_path = os.path.join(data_dir, "bullshit_data.parquet") 147 | df.to_parquet(parquet_path, index=False) 148 | print(f"Saved bullshit data to {parquet_path}") 149 | 150 | # Print some sample data 151 | print("\nSample of the bullshit data:") 152 | print(df.head()) 153 | 154 | # Print column info 155 | print("\nColumn data types:") 156 | print(df.dtypes) 157 | 158 | # Print basic stats 159 | print("\nBasic statistics:") 160 | print(df.describe()) 161 | 162 | # Also save as CSV for easy inspection 163 | csv_path = os.path.join(data_dir, "bullshit_data.csv") 164 | df.to_csv(csv_path, index=False) 165 | print(f"Also saved as CSV to {csv_path} for easy inspection") 166 | 167 | if __name__ == "__main__": 168 | main() ``` -------------------------------------------------------------------------------- /test_llm_api.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Test script for the LLM API endpoint in the Trino MCP server. 4 | 5 | This script tests the various endpoints of the API server to verify functionality. 6 | """ 7 | 8 | import json 9 | import requests 10 | from rich.console import Console 11 | from rich.table import Table 12 | from typing import Dict, Any, List, Optional 13 | 14 | # Configuration 15 | API_HOST = "localhost" 16 | API_PORT = 9097 17 | API_BASE_URL = f"http://{API_HOST}:{API_PORT}" 18 | 19 | console = Console() 20 | 21 | def test_endpoint(url: str, method: str = "GET", data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 22 | """Test an endpoint and return the response with detailed info.""" 23 | console.print(f"\n[bold blue]Testing {method} {url}[/bold blue]") 24 | 25 | try: 26 | if method.upper() == "GET": 27 | response = requests.get(url, timeout=5) 28 | elif method.upper() == "POST": 29 | response = requests.post(url, json=data, timeout=5) 30 | else: 31 | console.print(f"[bold red]Unsupported method: {method}[/bold red]") 32 | return {"success": False, "status_code": 0, "error": f"Unsupported method: {method}"} 33 | 34 | status_color = "green" if response.status_code < 400 else "red" 35 | console.print(f"Status: [bold {status_color}]{response.status_code} - {response.reason}[/bold {status_color}]") 36 | 37 | # Try to parse response as JSON 38 | try: 39 | data = response.json() 40 | console.print("Response data:") 41 | console.print(json.dumps(data, indent=2)) 42 | return { 43 | "success": response.status_code < 400, 44 | "status_code": response.status_code, 45 | "data": data 46 | } 47 | except ValueError: 48 | console.print(f"Response text: {response.text[:500]}") 49 | return { 50 | "success": response.status_code < 400, 51 | "status_code": response.status_code, 52 | "text": response.text[:500] 53 | } 54 | 55 | except Exception as e: 56 | console.print(f"[bold red]Error: {str(e)}[/bold red]") 57 | return {"success": False, "error": str(e)} 58 | 59 | def discover_all_endpoints() -> None: 60 | """Discover available endpoints by trying common paths.""" 61 | console.print("[bold yellow]Discovering endpoints...[/bold yellow]") 62 | 63 | # Common endpoints to check 64 | endpoints = [ 65 | "/", 66 | "/api", 67 | "/docs", 68 | "/redoc", 69 | "/openapi.json", 70 | "/health", 71 | "/api/query", 72 | "/query" 73 | ] 74 | 75 | results = [] 76 | for endpoint in endpoints: 77 | url = f"{API_BASE_URL}{endpoint}" 78 | result = test_endpoint(url) 79 | results.append({ 80 | "endpoint": endpoint, 81 | "status": result["status_code"] if "status_code" in result else "Error", 82 | "success": result.get("success", False) 83 | }) 84 | 85 | # Display results in a table 86 | table = Table(title="API Endpoint Discovery Results") 87 | table.add_column("Endpoint", style="cyan") 88 | table.add_column("Status", style="magenta") 89 | table.add_column("Result", style="green") 90 | 91 | for result in results: 92 | status = str(result["status"]) 93 | status_style = "green" if result["success"] else "red" 94 | result_text = "✅ Available" if result["success"] else "❌ Not Available" 95 | table.add_row(result["endpoint"], f"[{status_style}]{status}[/{status_style}]", result_text) 96 | 97 | console.print(table) 98 | 99 | def test_api_docs() -> bool: 100 | """Test the API documentation endpoint.""" 101 | console.print("\n[bold yellow]Testing API documentation...[/bold yellow]") 102 | 103 | # Try the /docs endpoint first 104 | url = f"{API_BASE_URL}/docs" 105 | result = test_endpoint(url) 106 | 107 | if result.get("success", False): 108 | console.print("[bold green]API documentation is available at /docs[/bold green]") 109 | return True 110 | else: 111 | console.print("[bold red]API documentation not available at /docs[/bold red]") 112 | 113 | # Try the OpenAPI JSON endpoint 114 | url = f"{API_BASE_URL}/openapi.json" 115 | result = test_endpoint(url) 116 | 117 | if result.get("success", False): 118 | console.print("[bold green]OpenAPI spec is available at /openapi.json[/bold green]") 119 | 120 | # Try to extract query endpoint from the spec 121 | if "data" in result: 122 | try: 123 | paths = result["data"].get("paths", {}) 124 | for path, methods in paths.items(): 125 | if "post" in methods and ("/query" in path or "/api/query" in path): 126 | console.print(f"[bold green]Found query endpoint in OpenAPI spec: {path}[/bold green]") 127 | return True 128 | except: 129 | console.print("[bold red]Failed to parse OpenAPI spec[/bold red]") 130 | 131 | return True 132 | else: 133 | console.print("[bold red]OpenAPI spec not available[/bold red]") 134 | return False 135 | 136 | def test_valid_query() -> bool: 137 | """Test a valid SQL query against the API.""" 138 | console.print("\n[bold yellow]Testing valid SQL query...[/bold yellow]") 139 | 140 | # Try multiple potential query endpoints 141 | query_payload = { 142 | "query": "SELECT 1 AS test", 143 | "catalog": "memory", 144 | "schema": "default" 145 | } 146 | 147 | endpoints = ["/api/query", "/query"] 148 | 149 | for endpoint in endpoints: 150 | url = f"{API_BASE_URL}{endpoint}" 151 | console.print(f"[bold]Trying endpoint: {endpoint}[/bold]") 152 | 153 | result = test_endpoint(url, method="POST", data=query_payload) 154 | 155 | if result.get("success", False): 156 | console.print(f"[bold green]Successfully executed query at {endpoint}[/bold green]") 157 | 158 | # Display results if available 159 | if "data" in result and "results" in result["data"]: 160 | display_query_results(result["data"]["results"]) 161 | 162 | return True 163 | 164 | console.print("[bold red]Failed to execute query on any endpoint[/bold red]") 165 | return False 166 | 167 | def test_invalid_query() -> bool: 168 | """Test an invalid SQL query to check error handling.""" 169 | console.print("\n[bold yellow]Testing invalid SQL query (error handling)...[/bold yellow]") 170 | 171 | query_payload = { 172 | "query": "SELECT * FROM nonexistent_table", 173 | "catalog": "memory", 174 | "schema": "default" 175 | } 176 | 177 | # Try the same endpoints as for valid query 178 | endpoints = ["/api/query", "/query"] 179 | 180 | for endpoint in endpoints: 181 | url = f"{API_BASE_URL}{endpoint}" 182 | console.print(f"[bold]Trying endpoint: {endpoint}[/bold]") 183 | 184 | result = test_endpoint(url, method="POST", data=query_payload) 185 | 186 | # Check if we got a proper error response (should be 400 Bad Request) 187 | if "status_code" in result and result["status_code"] == 400: 188 | console.print(f"[bold green]API correctly rejected invalid query at {endpoint} with 400 status[/bold green]") 189 | return True 190 | 191 | console.print("[bold red]Failed to properly handle invalid query on any endpoint[/bold red]") 192 | return False 193 | 194 | def display_query_results(results: Dict[str, Any]) -> None: 195 | """Display query results in a formatted table.""" 196 | if not results or "rows" not in results or not results["rows"]: 197 | console.print("[italic yellow]No results returned[/italic yellow]") 198 | return 199 | 200 | table = Table(title="Query Results") 201 | 202 | # Add columns to the table 203 | for column in results.get("columns", []): 204 | table.add_column(column) 205 | 206 | # Add data rows 207 | for row in results.get("rows", []): 208 | if isinstance(row, dict): 209 | table.add_row(*[str(row.get(col, "")) for col in results.get("columns", [])]) 210 | elif isinstance(row, list): 211 | table.add_row(*[str(val) for val in row]) 212 | 213 | console.print(table) 214 | console.print(f"[italic]Total rows: {results.get('row_count', len(results.get('rows', [])))}[/italic]") 215 | if "execution_time_ms" in results: 216 | console.print(f"[italic]Execution time: {results['execution_time_ms']} ms[/italic]") 217 | 218 | def main() -> None: 219 | """Run all tests.""" 220 | console.print("[bold green]=== Trino MCP LLM API Test ===\n[/bold green]") 221 | 222 | # First discover all available endpoints 223 | discover_all_endpoints() 224 | 225 | # Test API documentation 226 | docs_available = test_api_docs() 227 | 228 | # Only test queries if docs are available 229 | if docs_available: 230 | test_valid_query() 231 | test_invalid_query() 232 | else: 233 | console.print("[bold red]Skipping query tests as API documentation is not available[/bold red]") 234 | 235 | console.print("\n[bold green]=== Test completed ===\n[/bold green]") 236 | 237 | if __name__ == "__main__": 238 | main() ``` -------------------------------------------------------------------------------- /src/trino_mcp/trino_client.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Trino client wrapper for interacting with Trino. 3 | """ 4 | from __future__ import annotations 5 | 6 | import time 7 | from dataclasses import dataclass 8 | from typing import Any, Dict, List, Optional, Tuple, Union 9 | 10 | import trino 11 | from loguru import logger 12 | 13 | from trino_mcp.config import TrinoConfig 14 | 15 | 16 | @dataclass 17 | class TrinoQueryResult: 18 | """A class to represent the result of a Trino query.""" 19 | query_id: str 20 | columns: List[str] 21 | rows: List[List[Any]] 22 | query_time_ms: float 23 | row_count: int 24 | 25 | 26 | class TrinoClient: 27 | """ 28 | A wrapper around the trino-python client to interact with Trino. 29 | """ 30 | 31 | def __init__(self, config: TrinoConfig): 32 | """ 33 | Initialize the Trino client. 34 | 35 | Args: 36 | config: Trino connection configuration. 37 | """ 38 | self.config = config 39 | self.conn = None 40 | self.current_catalog = config.catalog 41 | self.current_schema = config.schema 42 | 43 | def connect(self) -> None: 44 | """ 45 | Connect to the Trino server. 46 | 47 | This will connect to Trino with the catalog parameter if provided. 48 | """ 49 | logger.info(f"Connecting to Trino at {self.config.host}:{self.config.port}") 50 | 51 | # Create connection params including catalog from config 52 | conn_params = self.config.connection_params 53 | 54 | # Connect to Trino with proper parameters 55 | self.conn = trino.dbapi.connect(**conn_params) 56 | 57 | def disconnect(self) -> None: 58 | """ 59 | Disconnect from the Trino server. 60 | """ 61 | if self.conn: 62 | logger.info("Disconnecting from Trino") 63 | self.conn.close() 64 | self.conn = None 65 | 66 | def ensure_connection(self) -> None: 67 | """ 68 | Ensure that the client is connected to Trino. 69 | """ 70 | if not self.conn: 71 | self.connect() 72 | 73 | def execute_query( 74 | self, 75 | sql: str, 76 | catalog: Optional[str] = None, 77 | schema: Optional[str] = None 78 | ) -> TrinoQueryResult: 79 | """ 80 | Execute a SQL query against Trino. 81 | 82 | Important note on catalog handling: This method properly sets the catalog by updating 83 | the connection parameters, rather than using unreliable "USE catalog" statements. The catalog 84 | is passed directly to the connection, which is more reliable than SQL-based catalog switching. 85 | 86 | Args: 87 | sql: The SQL query to execute. 88 | catalog: Optional catalog name to use for the query. 89 | schema: Optional schema name to use for the query. 90 | 91 | Returns: 92 | TrinoQueryResult: The result of the query. 93 | """ 94 | # If we're switching catalogs or don't have a connection, we need to reconnect 95 | use_catalog = catalog or self.current_catalog 96 | 97 | if self.conn and (use_catalog != self.current_catalog): 98 | logger.info(f"Switching catalog from {self.current_catalog} to {use_catalog}, reconnecting...") 99 | self.disconnect() 100 | 101 | # Update current catalog and schema 102 | self.current_catalog = use_catalog 103 | if schema: 104 | self.current_schema = schema 105 | 106 | # Update the config catalog before connecting 107 | if use_catalog: 108 | self.config.catalog = use_catalog 109 | 110 | # Ensure connection with updated catalog 111 | self.ensure_connection() 112 | 113 | # Create a cursor 114 | cursor = self.conn.cursor() 115 | 116 | # If we have a schema, try to set it 117 | # This still uses a USE statement, but catalogs are now set in the connection 118 | if self.current_schema: 119 | try: 120 | logger.debug(f"Setting schema to {self.current_schema}") 121 | 122 | # Make sure to include catalog with schema to avoid errors 123 | if self.current_catalog: 124 | cursor.execute(f"USE {self.current_catalog}.{self.current_schema}") 125 | else: 126 | logger.warning("Cannot set schema without catalog") 127 | except Exception as e: 128 | logger.warning(f"Failed to set schema: {e}") 129 | 130 | try: 131 | # Execute the query and time it 132 | logger.debug(f"Executing query: {sql}") 133 | start_time = time.time() 134 | cursor.execute(sql) 135 | query_time = time.time() - start_time 136 | 137 | # Fetch the query ID, metadata and results 138 | query_id = cursor.stats.get("queryId", "unknown") 139 | columns = [desc[0] for desc in cursor.description] if cursor.description else [] 140 | rows = cursor.fetchall() if cursor.description else [] 141 | 142 | return TrinoQueryResult( 143 | query_id=query_id, 144 | columns=columns, 145 | rows=rows, 146 | query_time_ms=query_time * 1000, 147 | row_count=len(rows) 148 | ) 149 | except Exception as e: 150 | logger.error(f"Query execution failed: {e}") 151 | raise 152 | 153 | def get_catalogs(self) -> List[Dict[str, str]]: 154 | """ 155 | Get a list of all catalogs in Trino. 156 | 157 | Returns: 158 | List[Dict[str, str]]: A list of catalog metadata. 159 | """ 160 | result = self.execute_query("SHOW CATALOGS") 161 | return [{"name": row[0]} for row in result.rows] 162 | 163 | def get_schemas(self, catalog: str) -> List[Dict[str, str]]: 164 | """ 165 | Get a list of all schemas in a catalog. 166 | 167 | Args: 168 | catalog: The catalog name. 169 | 170 | Returns: 171 | List[Dict[str, str]]: A list of schema metadata. 172 | """ 173 | result = self.execute_query(f"SHOW SCHEMAS FROM {catalog}", catalog=catalog) 174 | return [{"name": row[0], "catalog": catalog} for row in result.rows] 175 | 176 | def get_tables(self, catalog: str, schema: str) -> List[Dict[str, str]]: 177 | """ 178 | Get a list of all tables in a schema. 179 | 180 | Args: 181 | catalog: The catalog name. 182 | schema: The schema name. 183 | 184 | Returns: 185 | List[Dict[str, str]]: A list of table metadata. 186 | """ 187 | result = self.execute_query(f"SHOW TABLES FROM {catalog}.{schema}", catalog=catalog, schema=schema) 188 | return [{"name": row[0], "catalog": catalog, "schema": schema} for row in result.rows] 189 | 190 | def get_columns(self, catalog: str, schema: str, table: str) -> List[Dict[str, Any]]: 191 | """ 192 | Get a list of all columns in a table. 193 | 194 | Args: 195 | catalog: The catalog name. 196 | schema: The schema name. 197 | table: The table name. 198 | 199 | Returns: 200 | List[Dict[str, Any]]: A list of column metadata. 201 | """ 202 | result = self.execute_query( 203 | f"DESCRIBE {catalog}.{schema}.{table}", 204 | catalog=catalog, 205 | schema=schema 206 | ) 207 | columns = [] 208 | 209 | for row in result.rows: 210 | columns.append({ 211 | "name": row[0], 212 | "type": row[1], 213 | "extra": row[2] if len(row) > 2 else None, 214 | "catalog": catalog, 215 | "schema": schema, 216 | "table": table 217 | }) 218 | 219 | return columns 220 | 221 | def get_table_details(self, catalog: str, schema: str, table: str) -> Dict[str, Any]: 222 | """ 223 | Get detailed information about a table including columns and statistics. 224 | 225 | Args: 226 | catalog: The catalog name. 227 | schema: The schema name. 228 | table: The table name. 229 | 230 | Returns: 231 | Dict[str, Any]: Detailed table information. 232 | """ 233 | columns = self.get_columns(catalog, schema, table) 234 | 235 | # Get table statistics if available (might not be supported by all connectors) 236 | try: 237 | stats_query = f""" 238 | SELECT * FROM {catalog}.information_schema.tables 239 | WHERE table_catalog = '{catalog}' 240 | AND table_schema = '{schema}' 241 | AND table_name = '{table}' 242 | """ 243 | stats_result = self.execute_query(stats_query, catalog=catalog) 244 | stats = {} 245 | 246 | if stats_result.rows: 247 | row = stats_result.rows[0] 248 | for i, col in enumerate(stats_result.columns): 249 | stats[col.lower()] = row[i] 250 | except Exception as e: 251 | logger.warning(f"Failed to get table statistics: {e}") 252 | stats = {} 253 | 254 | return { 255 | "name": table, 256 | "catalog": catalog, 257 | "schema": schema, 258 | "columns": columns, 259 | "statistics": stats 260 | } 261 | 262 | def cancel_query(self, query_id: str) -> bool: 263 | """ 264 | Cancel a running query. 265 | 266 | Args: 267 | query_id: The ID of the query to cancel. 268 | 269 | Returns: 270 | bool: True if the query was successfully canceled, False otherwise. 271 | """ 272 | self.ensure_connection() 273 | 274 | try: 275 | # Use system procedures to cancel the query 276 | self.execute_query(f"CALL system.runtime.kill_query(query_id => '{query_id}')") 277 | return True 278 | except Exception as e: 279 | logger.error(f"Failed to cancel query {query_id}: {e}") 280 | return False 281 | ``` -------------------------------------------------------------------------------- /test_bullshit_query.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Test script to query our bullshit data through the MCP server. 4 | This demonstrates that our fix for the catalog handling works by running a complex query. 5 | """ 6 | import json 7 | import subprocess 8 | import sys 9 | import time 10 | 11 | def test_bullshit_query(): 12 | """Run a query against our bullshit data using the MCP STDIO transport.""" 13 | print("🚀 Testing Bullshit Data with MCP STDIO Transport") 14 | 15 | # Start the MCP server with STDIO transport 16 | cmd = [ 17 | "docker", "exec", "-i", "trino_mcp_trino-mcp_1", 18 | "python", "-m", "trino_mcp.server", 19 | "--transport", "stdio", 20 | "--debug", 21 | "--trino-host", "trino", 22 | "--trino-port", "8080", 23 | "--trino-user", "trino", 24 | "--trino-catalog", "memory" 25 | ] 26 | 27 | try: 28 | print("Starting MCP server process with STDIO transport...") 29 | process = subprocess.Popen( 30 | cmd, 31 | stdin=subprocess.PIPE, 32 | stdout=subprocess.PIPE, 33 | stderr=subprocess.PIPE, 34 | text=True, 35 | bufsize=1 # Line buffered 36 | ) 37 | 38 | # Sleep to let the server initialize 39 | time.sleep(2) 40 | 41 | # Helper function to send a request and get a response 42 | def send_request(request, expect_response=True): 43 | """ 44 | Send a request to the MCP server and get the response. 45 | 46 | Args: 47 | request: The JSON-RPC request to send 48 | expect_response: Whether to wait for a response 49 | 50 | Returns: 51 | The JSON-RPC response, or None if no response is expected 52 | """ 53 | request_str = json.dumps(request) + "\n" 54 | print(f"\n📤 Sending: {request_str.strip()}") 55 | 56 | try: 57 | process.stdin.write(request_str) 58 | process.stdin.flush() 59 | except BrokenPipeError: 60 | print("❌ Broken pipe - server has closed the connection") 61 | return None 62 | 63 | if not expect_response: 64 | print("✅ Sent notification (no response expected)") 65 | return None 66 | 67 | # Read the response 68 | print("Waiting for response...") 69 | try: 70 | response_str = process.stdout.readline() 71 | if response_str: 72 | print(f"📩 Received response") 73 | return json.loads(response_str) 74 | else: 75 | print("❌ No response received") 76 | return None 77 | except Exception as e: 78 | print(f"❌ Error reading response: {e}") 79 | return None 80 | 81 | # ===== STEP 1: Initialize MCP ===== 82 | print("\n===== STEP 1: Initialize MCP =====") 83 | initialize_request = { 84 | "jsonrpc": "2.0", 85 | "id": 1, 86 | "method": "initialize", 87 | "params": { 88 | "protocolVersion": "2024-11-05", 89 | "clientInfo": { 90 | "name": "bullshit-data-query-test", 91 | "version": "1.0.0" 92 | }, 93 | "capabilities": { 94 | "tools": True, 95 | "resources": { 96 | "supportedSources": ["trino://catalog"] 97 | } 98 | } 99 | } 100 | } 101 | 102 | init_response = send_request(initialize_request) 103 | if not init_response: 104 | print("❌ Failed to initialize MCP - exiting test") 105 | return 106 | 107 | # Print server info 108 | if "result" in init_response and "serverInfo" in init_response["result"]: 109 | server_info = init_response["result"]["serverInfo"] 110 | print(f"✅ Connected to server: {server_info.get('name')} {server_info.get('version')}") 111 | 112 | # ===== STEP 2: Send initialized notification ===== 113 | print("\n===== STEP 2: Send initialized notification =====") 114 | initialized_notification = { 115 | "jsonrpc": "2.0", 116 | "method": "notifications/initialized", 117 | "params": {} 118 | } 119 | 120 | send_request(initialized_notification, expect_response=False) 121 | 122 | # ===== STEP 3: Query the bullshit data ===== 123 | print("\n===== STEP 3: Query the Bullshit Data =====") 124 | query_request = { 125 | "jsonrpc": "2.0", 126 | "id": 2, 127 | "method": "tools/call", 128 | "params": { 129 | "name": "execute_query", 130 | "arguments": { 131 | "sql": """ 132 | SELECT 133 | job_title, 134 | COUNT(*) as count, 135 | AVG(salary) as avg_salary, 136 | MAX(salary) as max_salary, 137 | AVG(bullshit_factor) as avg_bs_factor 138 | FROM 139 | memory.bullshit.real_bullshit_data 140 | WHERE 141 | salary > 150000 142 | GROUP BY 143 | job_title 144 | HAVING 145 | AVG(bullshit_factor) > 5 146 | ORDER BY 147 | avg_salary DESC 148 | LIMIT 10 149 | """, 150 | "catalog": "memory", 151 | "schema": "bullshit" 152 | } 153 | } 154 | } 155 | 156 | query_response = send_request(query_request) 157 | if not query_response or "error" in query_response: 158 | if "error" in query_response: 159 | print(f"❌ Query error: {json.dumps(query_response.get('error', {}), indent=2)}") 160 | else: 161 | print("❌ Failed to execute query") 162 | else: 163 | print(f"✅ Bullshit query executed successfully!") 164 | 165 | # Parse the nested JSON in the content field 166 | try: 167 | content_text = query_response.get("result", {}).get("content", [{}])[0].get("text", "{}") 168 | result_data = json.loads(content_text) 169 | 170 | # Now we have the actual query result 171 | columns = result_data.get("columns", []) 172 | row_count = result_data.get("row_count", 0) 173 | preview_rows = result_data.get("preview_rows", []) 174 | 175 | print(f"\nColumns: {', '.join(columns)}") 176 | print(f"Row count: {row_count}") 177 | print("\n🏆 TOP 10 BULLSHIT JOBS (high salary, high BS factor):") 178 | print("-" * 100) 179 | 180 | # Print header with nice formatting 181 | header = " | ".join(f"{col.upper():20}" for col in columns) 182 | print(header) 183 | print("-" * 100) 184 | 185 | # Print rows with nice formatting 186 | for row in preview_rows: 187 | row_str = [] 188 | for col in columns: 189 | value = row.get(col, "N/A") 190 | if isinstance(value, float): 191 | row_str.append(f"{value:20.2f}") 192 | else: 193 | row_str.append(f"{str(value):20}") 194 | print(" | ".join(row_str)) 195 | 196 | except json.JSONDecodeError: 197 | print(f"Error parsing result content: {query_response}") 198 | except Exception as e: 199 | print(f"Error processing result: {e}") 200 | print(f"Raw result: {json.dumps(query_response.get('result', {}), indent=2)}") 201 | 202 | # ===== STEP 4: List Available Schemas ===== 203 | print("\n===== STEP 4: List Available Schemas in Memory Catalog =====") 204 | schema_query = { 205 | "jsonrpc": "2.0", 206 | "id": 3, 207 | "method": "tools/call", 208 | "params": { 209 | "name": "execute_query", 210 | "arguments": { 211 | "sql": "SHOW SCHEMAS FROM memory", 212 | "catalog": "memory" 213 | } 214 | } 215 | } 216 | 217 | schema_response = send_request(schema_query) 218 | if not schema_response or "error" in schema_response: 219 | if "error" in schema_response: 220 | print(f"❌ Schema query error: {json.dumps(schema_response.get('error', {}), indent=2)}") 221 | else: 222 | print("❌ Failed to execute schema query") 223 | else: 224 | print(f"✅ Schema query executed successfully!") 225 | 226 | # Parse the nested JSON in the content field 227 | try: 228 | content_text = schema_response.get("result", {}).get("content", [{}])[0].get("text", "{}") 229 | result_data = json.loads(content_text) 230 | 231 | # Extract schema names 232 | preview_rows = result_data.get("preview_rows", []) 233 | schema_column = result_data.get("columns", ["Schema"])[0] 234 | 235 | print("\n🗂️ Available schemas in memory catalog:") 236 | for row in preview_rows: 237 | schema_name = row.get(schema_column, "unknown") 238 | print(f" - {schema_name}") 239 | 240 | except json.JSONDecodeError: 241 | print(f"Error parsing schemas content: {schema_response}") 242 | except Exception as e: 243 | print(f"Error processing schemas: {e}") 244 | print(f"Raw schema result: {json.dumps(schema_response.get('result', {}), indent=2)}") 245 | 246 | print("\n🎉 Test completed successfully!") 247 | 248 | except Exception as e: 249 | print(f"❌ Error: {e}") 250 | finally: 251 | # Make sure to terminate the process 252 | if 'process' in locals() and process.poll() is None: 253 | print("\nTerminating server process...") 254 | process.terminate() 255 | try: 256 | process.wait(timeout=5) 257 | except subprocess.TimeoutExpired: 258 | print("Process didn't terminate, killing it...") 259 | process.kill() 260 | 261 | if __name__ == "__main__": 262 | test_bullshit_query() ``` -------------------------------------------------------------------------------- /scripts/docker_stdio_test.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Minimalist MCP STDIO test to run inside the container. 4 | This script avoids importing any external modules and uses just the Python standard library. 5 | """ 6 | import json 7 | import subprocess 8 | import time 9 | import sys 10 | import threading 11 | 12 | def test_mcp_stdio(): 13 | """Run a test of the MCP server using STDIO transport inside the container.""" 14 | print("🚀 Testing MCP with STDIO transport (container version)") 15 | 16 | # Start the MCP server with STDIO transport 17 | # We're directly using the module since we're in the container 18 | # Explicitly set the Trino host to trino:8080 19 | server_cmd = [ 20 | "python", "-m", "trino_mcp.server", 21 | "--transport", "stdio", 22 | "--debug", 23 | "--trino-host", "trino", 24 | "--trino-port", "8080", 25 | "--trino-catalog", "memory" 26 | ] 27 | 28 | try: 29 | # Start the server in a subprocess 30 | process = subprocess.Popen( 31 | server_cmd, 32 | stdin=subprocess.PIPE, 33 | stdout=subprocess.PIPE, 34 | stderr=subprocess.PIPE, 35 | text=True, 36 | bufsize=1 # Line buffered 37 | ) 38 | 39 | # Set up a thread to monitor stderr and print it 40 | def print_stderr(): 41 | while True: 42 | line = process.stderr.readline() 43 | if not line: 44 | break 45 | print(f"🔴 SERVER ERROR: {line.strip()}") 46 | 47 | stderr_thread = threading.Thread(target=print_stderr, daemon=True) 48 | stderr_thread.start() 49 | 50 | print("Starting server process...") 51 | # Sleep to allow server to initialize 52 | time.sleep(2) 53 | 54 | # Helper function to send a request and get a response 55 | def send_request(request_data, request_desc="", expect_response=True): 56 | request_json = json.dumps(request_data) + "\n" 57 | print(f"\n📤 Sending {request_desc}: {request_json.strip()}") 58 | 59 | try: 60 | process.stdin.write(request_json) 61 | process.stdin.flush() 62 | except BrokenPipeError: 63 | print(f"❌ Broken pipe when sending {request_desc}") 64 | return None 65 | 66 | # If we don't expect a response (notification), just return 67 | if not expect_response: 68 | print(f"✅ Sent {request_desc} (no response expected)") 69 | return True 70 | 71 | # Read response with timeout 72 | print(f"Waiting for {request_desc} response...") 73 | start_time = time.time() 74 | timeout = 10 75 | 76 | while time.time() - start_time < timeout: 77 | # Check if process is still running 78 | if process.poll() is not None: 79 | print(f"Server process exited with code {process.returncode}") 80 | return None 81 | 82 | # Try to read a line from stdout 83 | response_line = process.stdout.readline().strip() 84 | if response_line: 85 | print(f"📩 Received response: {response_line}") 86 | try: 87 | return json.loads(response_line) 88 | except json.JSONDecodeError as e: 89 | print(f"❌ Error parsing response: {e}") 90 | 91 | # Wait a bit before trying again 92 | time.sleep(0.1) 93 | 94 | print(f"⏱️ Timeout waiting for {request_desc} response") 95 | return None 96 | 97 | # STEP 1: Initialize the server 98 | print("\n=== STEP 1: Initialize Server ===") 99 | initialize_request = { 100 | "jsonrpc": "2.0", 101 | "id": 1, 102 | "method": "initialize", 103 | "params": { 104 | "protocolVersion": "2024-11-05", 105 | "clientInfo": { 106 | "name": "container-stdio-test", 107 | "version": "1.0.0" 108 | }, 109 | "capabilities": { 110 | "tools": True, 111 | "resources": { 112 | "supportedSources": ["trino://catalog"] 113 | } 114 | } 115 | } 116 | } 117 | 118 | init_response = send_request(initialize_request, "initialize request") 119 | if not init_response: 120 | raise Exception("Failed to initialize MCP server") 121 | 122 | server_info = init_response.get("result", {}).get("serverInfo", {}) 123 | print(f"✅ Connected to server: {server_info.get('name')} {server_info.get('version')}") 124 | 125 | # STEP 2: Send initialized notification (no response expected) 126 | print("\n=== STEP 2: Send Initialized Notification ===") 127 | initialized_notification = { 128 | "jsonrpc": "2.0", 129 | "method": "notifications/initialized", 130 | "params": {} # Empty params object is required 131 | } 132 | _ = send_request(initialized_notification, "initialized notification", expect_response=False) 133 | 134 | # STEP 3: List available tools 135 | print("\n=== STEP 3: List Available Tools ===") 136 | tools_request = { 137 | "jsonrpc": "2.0", 138 | "id": 2, 139 | "method": "tools/list" 140 | } 141 | 142 | tools_response = send_request(tools_request, "tools list request") 143 | if not tools_response: 144 | print("❌ Failed to list tools") 145 | else: 146 | tools = tools_response.get("result", {}).get("tools", []) 147 | print(f"✅ Available tools: {len(tools)}") 148 | for tool in tools: 149 | print(f" - {tool.get('name')}: {tool.get('description', 'No description')}") 150 | 151 | # STEP 4: Execute a simple query 152 | if tools_response: 153 | print("\n=== STEP 4: Execute Simple Query ===") 154 | query_request = { 155 | "jsonrpc": "2.0", 156 | "id": 3, 157 | "method": "tools/call", 158 | "params": { 159 | "name": "execute_query", 160 | "arguments": { 161 | "sql": "SELECT 'Hello, world!' AS greeting", 162 | "catalog": "memory" 163 | } 164 | } 165 | } 166 | 167 | query_response = send_request(query_request, "query execution") 168 | if not query_response: 169 | print("❌ Failed to execute query") 170 | elif "error" in query_response: 171 | print(f"❌ Query error: {json.dumps(query_response.get('error', {}), indent=2)}") 172 | else: 173 | result = query_response.get("result", {}) 174 | print(f"✅ Query executed successfully:") 175 | print(f" Columns: {', '.join(result.get('columns', []))}") 176 | print(f" Row count: {result.get('row_count', 0)}") 177 | print(f" Preview rows: {json.dumps(result.get('preview_rows', []), indent=2)}") 178 | 179 | # STEP 5: Try to query a bullshit table 180 | print("\n=== STEP 5: Query Bullshit Table ===") 181 | bs_query_request = { 182 | "jsonrpc": "2.0", 183 | "id": 4, 184 | "method": "tools/call", 185 | "params": { 186 | "name": "execute_query", 187 | "arguments": { 188 | "sql": "SELECT * FROM memory.bullshit.bullshit_data LIMIT 3", 189 | "catalog": "memory" 190 | } 191 | } 192 | } 193 | 194 | bs_query_response = send_request(bs_query_request, "bullshit table query") 195 | if not bs_query_response: 196 | print("❌ Failed to execute bullshit table query") 197 | elif "error" in bs_query_response: 198 | print(f"❌ Query error: {json.dumps(bs_query_response.get('error', {}), indent=2)}") 199 | else: 200 | result = bs_query_response.get("result", {}) 201 | print(f"✅ Bullshit query executed successfully:") 202 | print(f" Columns: {', '.join(result.get('columns', []))}") 203 | print(f" Row count: {result.get('row_count', 0)}") 204 | print(f" Preview rows: {json.dumps(result.get('preview_rows', []), indent=2)}") 205 | 206 | # STEP 6: Try resources listing 207 | print("\n=== STEP 6: List Resources ===") 208 | resources_request = { 209 | "jsonrpc": "2.0", 210 | "id": 5, 211 | "method": "resources/list", 212 | "params": { 213 | "source": "trino://catalog" 214 | } 215 | } 216 | 217 | resources_response = send_request(resources_request, "resources list request") 218 | if not resources_response: 219 | print("❌ Failed to list resources") 220 | elif "error" in resources_response: 221 | print(f"❌ Resources error: {json.dumps(resources_response.get('error', {}), indent=2)}") 222 | else: 223 | resources = resources_response.get("result", {}).get("items", []) 224 | print(f"✅ Available resources: {len(resources)}") 225 | for resource in resources: 226 | print(f" - {resource.get('source')}: {resource.get('path')}") 227 | 228 | # STEP 7: Shutdown 229 | print("\n=== STEP 7: Shutdown ===") 230 | shutdown_request = { 231 | "jsonrpc": "2.0", 232 | "id": 6, 233 | "method": "shutdown" 234 | } 235 | 236 | shutdown_response = send_request(shutdown_request, "shutdown request") 237 | print("✅ Server shutdown request sent") 238 | 239 | # Send exit notification (no response expected) 240 | exit_notification = { 241 | "jsonrpc": "2.0", 242 | "method": "exit", 243 | "params": {} # Empty params may be needed 244 | } 245 | 246 | _ = send_request(exit_notification, "exit notification", expect_response=False) 247 | 248 | except Exception as e: 249 | print(f"❌ Error: {e}") 250 | finally: 251 | # Make sure to terminate the process 252 | if 'process' in locals() and process.poll() is None: 253 | print("Terminating server process...") 254 | process.terminate() 255 | try: 256 | process.wait(timeout=5) 257 | except subprocess.TimeoutExpired: 258 | print("Process didn't terminate, killing it...") 259 | process.kill() 260 | 261 | print("\n🏁 Test completed!") 262 | 263 | if __name__ == "__main__": 264 | test_mcp_stdio() ```