This is page 4 of 5. Use http://codebase.md/freepeak/db-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cm
│ └── gitstream.cm
├── .cursor
│ ├── mcp-example.json
│ ├── mcp.json
│ └── rules
│ └── global.mdc
├── .dockerignore
├── .DS_Store
├── .env.example
├── .github
│ ├── FUNDING.yml
│ └── workflows
│ └── go.yml
├── .gitignore
├── .golangci.yml
├── assets
│ └── logo.svg
├── CHANGELOG.md
├── cmd
│ └── server
│ └── main.go
├── commit-message.txt
├── config.json
├── config.timescaledb-test.json
├── docker-compose.mcp-test.yml
├── docker-compose.test.yml
├── docker-compose.timescaledb-test.yml
├── docker-compose.yml
├── docker-wrapper.sh
├── Dockerfile
├── docs
│ ├── REFACTORING.md
│ ├── TIMESCALEDB_FUNCTIONS.md
│ ├── TIMESCALEDB_IMPLEMENTATION.md
│ ├── TIMESCALEDB_PRD.md
│ └── TIMESCALEDB_TOOLS.md
├── examples
│ └── postgres_connection.go
├── glama.json
├── go.mod
├── go.sum
├── init-scripts
│ └── timescaledb
│ ├── 01-init.sql
│ ├── 02-sample-data.sql
│ ├── 03-continuous-aggregates.sql
│ └── README.md
├── internal
│ ├── config
│ │ ├── config_test.go
│ │ └── config.go
│ ├── delivery
│ │ └── mcp
│ │ ├── compression_policy_test.go
│ │ ├── context
│ │ │ ├── hypertable_schema_test.go
│ │ │ ├── timescale_completion_test.go
│ │ │ ├── timescale_context_test.go
│ │ │ └── timescale_query_suggestion_test.go
│ │ ├── mock_test.go
│ │ ├── response_test.go
│ │ ├── response.go
│ │ ├── retention_policy_test.go
│ │ ├── server_wrapper.go
│ │ ├── timescale_completion.go
│ │ ├── timescale_context.go
│ │ ├── timescale_schema.go
│ │ ├── timescale_tool_test.go
│ │ ├── timescale_tool.go
│ │ ├── timescale_tools_test.go
│ │ ├── tool_registry.go
│ │ └── tool_types.go
│ ├── domain
│ │ └── database.go
│ ├── logger
│ │ ├── logger_test.go
│ │ └── logger.go
│ ├── repository
│ │ └── database_repository.go
│ └── usecase
│ └── database_usecase.go
├── LICENSE
├── Makefile
├── pkg
│ ├── core
│ │ ├── core.go
│ │ └── logging.go
│ ├── db
│ │ ├── db_test.go
│ │ ├── db.go
│ │ ├── manager.go
│ │ ├── README.md
│ │ └── timescale
│ │ ├── config_test.go
│ │ ├── config.go
│ │ ├── connection_test.go
│ │ ├── connection.go
│ │ ├── continuous_aggregate_test.go
│ │ ├── continuous_aggregate.go
│ │ ├── hypertable_test.go
│ │ ├── hypertable.go
│ │ ├── metadata.go
│ │ ├── mocks_test.go
│ │ ├── policy_test.go
│ │ ├── policy.go
│ │ ├── query.go
│ │ ├── timeseries_test.go
│ │ └── timeseries.go
│ ├── dbtools
│ │ ├── db_helpers.go
│ │ ├── dbtools_test.go
│ │ ├── dbtools.go
│ │ ├── exec.go
│ │ ├── performance_test.go
│ │ ├── performance.go
│ │ ├── query.go
│ │ ├── querybuilder_test.go
│ │ ├── querybuilder.go
│ │ ├── README.md
│ │ ├── schema_test.go
│ │ ├── schema.go
│ │ ├── tx_test.go
│ │ └── tx.go
│ ├── internal
│ │ └── logger
│ │ └── logger.go
│ ├── jsonrpc
│ │ └── jsonrpc.go
│ ├── logger
│ │ └── logger.go
│ └── tools
│ └── tools.go
├── README-old.md
├── README.md
├── repomix-output.txt
├── request.json
├── start-mcp.sh
├── test.Dockerfile
├── timescaledb-test.sh
└── wait-for-it.sh
```
# Files
--------------------------------------------------------------------------------
/README-old.md:
--------------------------------------------------------------------------------
```markdown
<div align="center">
# Multi DB MCP Server
[](https://opensource.org/licenses/MIT)
[](https://goreportcard.com/report/github.com/FreePeak/db-mcp-server)
[](https://pkg.go.dev/github.com/FreePeak/db-mcp-server)
[](https://github.com/FreePeak/db-mcp-server/graphs/contributors)
<h3>A robust multi-database implementation of the Database Model Context Protocol (DB MCP)</h3>
[Features](#key-features) • [AI Benefits](#ai-integration-benefits) • [Installation](#installation) • [Usage](#usage) • [Documentation](#documentation) • [Contributing](#contributing) • [License](#license)
</div>
---
## 📋 Overview
The DB MCP Server is a high-performance implementation of the Database Model Context Protocol designed to revolutionize how AI agents interact with databases. By creating a standardized communication layer between AI models and database systems, it enables AI agents to discover, understand, and manipulate database structures with unprecedented context awareness. Currently supporting MySQL and PostgreSQL databases, with plans to expand to most widely used databases including NoSQL solutions, DB MCP Server eliminates the knowledge gap between AI agents and your data, enabling more intelligent, context-aware database operations that previously required human expertise.
## ✨ Key Features
- **AI-Optimized Context Protocol**: Provides rich database context to AI agents, enabling them to reason about schema, relationships, and data patterns
- **Semantic Understanding Bridge**: Translates between natural language queries and database operations with full schema awareness
- **Contextual Database Operations**: Allows AI agents to execute database operations with full understanding of schema, constraints, and relationships
- **Multi-Database Support**: Currently supports MySQL and PostgreSQL with plans for expansion
- **Dynamic Tool Registry**: Register, discover, and invoke database tools at runtime via standard protocol AI agents can understand
- **Editor Integration**: First-class support for VS Code and Cursor extensions with AI-aware features
- **Schema-Aware Assistance**: Provides AI models with complete database structure knowledge for better suggestions
- **Performance Insights**: Delivers performance analytics that AI can leverage for optimization recommendations
## 🧠 AI Integration Benefits
The DB MCP Server transforms how AI agents interact with databases in several key ways:
### Enhanced Contextual Understanding
- **Schema Awareness**: AI agents gain complete knowledge of database tables, columns, relationships, and constraints
- **Semantic Relationship Mapping**: Enables AI to understand not just structure but meaning and purpose of data elements
- **Query Context Preservation**: Maintains context between related operations for coherent multi-step reasoning
### Intelligent Database Operations
- **Natural Language to SQL**: Translates user intent into optimized database operations with full schema awareness
- **Context-Aware Query Generation**: Creates queries that respect database structure, types, and relationships
- **Error Prevention**: Understands database constraints before execution, preventing common errors
- **Optimization Suggestions**: Provides AI with execution metrics for intelligent query improvement recommendations
### Workflow Optimization
- **Reduced Context Window Usage**: Efficiently provides database structure without consuming AI token context
- **Operation Chaining**: Enables complex multi-step operations with persistent context
- **Intelligent Defaults**: Suggests appropriate actions based on database structure and common patterns
- **Progressive Disclosure**: Reveals database complexity progressively as needed by the AI agent
## 🚀 Installation
### Prerequisites
- Go 1.18 or later
- Supported databases:
- MySQL
- PostgreSQL
- (Additional databases in roadmap)
- Docker (optional, for containerized deployment)
### Quick Start
```bash
# Clone the repository
git clone https://github.com/FreePeak/db-mcp-server.git
cd db-mcp-server
# Copy and configure environment variables
cp .env.example .env
# Edit .env with your configuration
# Option 1: Build and run locally with SSE transport (default)
make build
./mcp-server
# Option 2: Build and run with STDIO transport
make build
./mcp-server -t stdio
# Option 3: Using Docker
docker build -t db-mcp-server .
docker run -p 9090:9090 db-mcp-server
# Option 4: Using Docker Compose (with MySQL)
docker-compose up -d
```
### Transport Modes
The server supports two transport modes:
1. **SSE (Server-Sent Events)** - Default mode for browser and HTTP clients
```bash
./mcp-server -t sse
```
2. **STDIO (Standard Input/Output)** - For command-line tools and integrations
```bash
./mcp-server -t stdio
```
For STDIO mode, see the [examples directory](./examples) for usage examples.
### Docker
```bash
# Build the Docker image
docker build -t db-mcp-server .
# Run the container
docker run -p 9090:9090 db-mcp-server
# Run with custom configuration
docker run -p 8080:8080 \
-e SERVER_PORT=8080 \
-e LOG_LEVEL=debug \
-e DB_TYPE=mysql \
-e DB_HOST=my-database-server \
db-mcp-server
# Run with Docker Compose (includes MySQL database)
docker-compose up -d
```
## 🔧 Configuration
DB MCP Server can be configured via environment variables or a `.env` file:
| Variable | Description | Default |
|----------|-------------|---------|
| `SERVER_PORT` | Server port | `9092` |
| `TRANSPORT_MODE` | Transport mode (stdio, sse) | `stdio` |
| `LOG_LEVEL` | Logging level (debug, info, warn, error) | `debug` |
| `DB_TYPE` | Database type (mysql, postgres) | `mysql` |
| `DB_HOST` | Database host | `localhost` |
| `DB_PORT` | Database port | `3306` |
| `DB_USER` | Database username | `iamrevisto` |
| `DB_PASSWORD` | Database password | `password` |
| `DB_NAME` | Database name | `revisto` |
| `DB_ROOT_PASSWORD` | Database root password (for container setup) | `root_password` |
See `.env.example` for more configuration options.
## 📖 Usage
### Integrating with Cursor Edit and AI Agents
DB MCP Server creates a powerful bridge between your databases and AI assistants in Cursor Edit, enabling AI-driven database operations with full context awareness. Configure your Cursor settings in `.cursor/mcp.json`:
```json
{
"mcpServers": {
"db-mcp-server": {
"url": "http://localhost:9090/sse"
}
}
}
```
To leverage AI-powered database operations:
1. Configure and start the DB MCP Server using one of the installation methods above
2. Add the configuration to your Cursor settings
3. Open Cursor and navigate to a SQL or code file
4. The AI assistant now has access to your database schema, relationships, and capabilities
5. Ask the AI to generate, explain, or optimize database queries with full schema awareness
6. Execute AI-generated queries directly from Cursor
The MCP Server enhances AI assistant capabilities with:
- Complete database schema understanding
- Relationship-aware query generation
- Intelligent query optimization recommendations
- Error prevention through constraint awareness
- Performance metrics for better suggestions
- Context persistence across multiple operations
### Example AI Interactions
```
# Ask the AI for schema information
"What tables are in the database and how are they related?"
# Request query generation with context
"Create a query to find all orders from customers in California with items over $100"
# Get optimization suggestions
"How can I optimize this query that's taking too long to execute?"
# Request complex data operations
"Help me create a transaction that updates inventory levels when an order is placed"
```
## 📚 Documentation
### DB MCP Protocol for AI Integration
The server implements the DB MCP protocol with methods specifically designed to enhance AI agent capabilities:
- **initialize**: Sets up the session, transmits schema context, and returns server capabilities
- **tools/list**: Enables AI agents to discover available database tools dynamically
- **tools/call**: Allows AI to execute database tools with full context
- **editor/context**: Updates the server with editor context for better AI awareness
- **schema/explore**: Provides AI with detailed database structure information
- **cancel**: Cancels an in-progress operation
For full protocol documentation, visit the [MCP Specification](https://github.com/microsoft/mcp) and our database-specific extensions for AI integration.
### Tool System
The DB MCP Server includes a powerful AI-aware tool system that provides large language models and AI assistants with a structured way to discover and invoke database tools. Each tool has:
- A unique name discoverable by AI
- A comprehensive description that AI can understand
- A JSON Schema for input validation and AI parameter generation
- A structured output format that AI can parse and reason about
- A handler function that executes the tool's logic with context awareness
This structure enables AI agents to intelligently select, parameterize, and invoke the right database operations without requiring hard-coded knowledge of your specific database schema.
### Built-in Tools for AI Integration
The server includes AI-optimized database tools that provide rich context and capabilities:
| Tool | Description | AI Benefits |
|------|-------------|------------|
| `dbQuery` | Executes read-only SQL queries with parameterized inputs | Enables AI to retrieve data with full schema knowledge |
| `dbExecute` | Performs data modification operations (INSERT, UPDATE, DELETE) | Allows AI to safely modify data with constraint awareness |
| `dbTransaction` | Manages SQL transactions with commit and rollback support | Supports AI in creating complex multi-step operations |
| `dbSchema` | Auto-discovers database structure and relationships | Provides AI with complete schema context for reasoning |
| `dbQueryBuilder` | Visual SQL query construction with syntax validation | Helps AI create syntactically correct queries |
| `dbPerformanceAnalyzer` | Identifies slow queries and provides optimization suggestions | Enables AI to suggest performance improvements |
| `showConnectedDatabases` | Shows information about all connected databases | Enables AI to understand available database connections and their status |
### Multiple Database Support
DB MCP Server supports connecting to multiple databases simultaneously, allowing AI agents to work across different database systems in a unified way. Each database connection is identified by a unique ID that can be referenced when using database tools.
#### Configuring Multiple Databases
In your .env file
```
# Multi-Database Configuration
DB_CONFIG_FILE=config/databases.json
```
Configure multiple database connections in your `db-mcp-server/config/databases.json` file or environment variables:
```
# Multiple Database Configuration
{
"connections": [
{
"id": "mysql1",
"type": "mysql",
"host": "localhost",
"port": 13306,
"user": "user1",
"password": "password1",
"name": "db1"
},
{
"id": "mysql2",
"type": "mysql",
"host": "localhost",
"port": 13307,
"user": "user3",
"password": "password3",
"name": "db3"
},
{
"id": "postgres1",
"type": "postgres",
"host": "localhost",
"port": 15432,
"user": "user2",
"password": "password2",
"name": "db2"
}
]
}
```
#### Viewing Connected Databases
Use the `showConnectedDatabases` tool to see all connected databases with their status and connection information:
```json
// Get information about all connected databases
{
"name": "showConnectedDatabases"
}
```
Example response:
```json
[
{
"id": "mysql1",
"type": "mysql",
"host": "localhost",
"port": 3306,
"database": "db1",
"status": "connected",
"latency": "1.2ms"
},
{
"id": "postgres1",
"type": "postgres",
"host": "localhost",
"port": 5432,
"database": "db2",
"status": "connected",
"latency": "0.8ms"
}
]
```
#### Specifying Database for Operations
When using database tools, you must specify which database to use with the `database` parameter:
```json
// Query a specific database by ID
{
"name": "dbQuery",
"arguments": {
"database": "postgres1",
"query": "SELECT * FROM users LIMIT 10"
}
}
// Execute statement on a specific database
{
"name": "dbExecute",
"arguments": {
"database": "mysql2",
"statement": "UPDATE products SET stock = stock - 1 WHERE id = 5"
}
}
// Get schema from a specific database
{
"name": "dbSchema",
"arguments": {
"database": "mysql1",
"component": "tables"
}
}
```
> **Note**: Always use `database` as the parameter name when specifying which database to use. This is required for all database operation tools.
If your configuration has only one database connection, you must still provide the database ID that matches the ID in your configuration.
### Database Schema Explorer Tool
The MCP Server includes an AI-aware Database Schema Explorer tool (`dbSchema`) that provides AI models with complete database structural knowledge:
```json
// Get all tables in the database - enables AI to understand available data entities
{
"name": "dbSchema",
"arguments": {
"database": "mysql1",
"component": "tables"
}
}
// Get columns for a specific table - gives AI detailed field information
{
"name": "dbSchema",
"arguments": {
"database": "postgres1",
"component": "columns",
"table": "users"
}
}
// Get relationships for a specific table or all relationships - helps AI understand data connections
{
"name": "dbSchema",
"arguments": {
"database": "mysql1",
"component": "relationships",
"table": "orders"
}
}
// Get the full database schema - provides AI with comprehensive structural context
{
"name": "dbSchema",
"arguments": {
"database": "postgres1",
"component": "full"
}
}
```
The Schema Explorer supports both MySQL and PostgreSQL databases, automatically adapting to your configured database type and providing AI with the appropriate contextual information.
### Visual Query Builder Tool
The MCP Server includes a powerful Visual Query Builder tool (`dbQueryBuilder`) that helps you construct SQL queries with syntax validation:
```json
// Validate a SQL query for syntax errors
{
"name": "dbQueryBuilder",
"arguments": {
"database": "mysql1",
"action": "validate",
"query": "SELECT * FROM users WHERE status = 'active'"
}
}
// Build a SQL query from components
{
"name": "dbQueryBuilder",
"arguments": {
"database": "postgres1",
"action": "build",
"components": {
"select": ["id", "name", "email"],
"from": "users",
"where": [
{
"column": "status",
"operator": "=",
"value": "active"
}
],
"orderBy": [
{
"column": "name",
"direction": "ASC"
}
],
"limit": 10
}
}
}
// Analyze a SQL query for potential issues and performance
{
"name": "dbQueryBuilder",
"arguments": {
"database": "mysql1",
"action": "analyze",
"query": "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id WHERE u.status = 'active' AND o.created_at > '2023-01-01'"
}
}
```
Example response from a query build operation:
```json
{
"query": "SELECT id, name, email FROM users WHERE status = 'active' ORDER BY name ASC LIMIT 10",
"components": {
"select": ["id", "name", "email"],
"from": "users",
"where": [{
"column": "status",
"operator": "=",
"value": "active"
}],
"orderBy": [{
"column": "name",
"direction": "ASC"
}],
"limit": 10
},
"validation": {
"valid": true,
"query": "SELECT id, name, email FROM users WHERE status = 'active' ORDER BY name ASC LIMIT 10"
}
}
```
The Query Builder supports:
- SELECT statements with multiple columns
- JOIN operations (inner, left, right, full)
- WHERE conditions with various operators
- GROUP BY and HAVING clauses
- ORDER BY with sorting direction
- LIMIT and OFFSET for pagination
- Syntax validation and error suggestions
- Query complexity analysis
### Performance Analyzer Tool
The MCP Server includes a powerful Performance Analyzer tool (`dbPerformanceAnalyzer`) that identifies slow queries and provides optimization suggestions:
```json
// Get slow queries that exceed the configured threshold
{
"name": "dbPerformanceAnalyzer",
"arguments": {
"database": "mysql1",
"action": "getSlowQueries",
"limit": 5
}
}
// Get metrics for all tracked queries sorted by average duration
{
"name": "dbPerformanceAnalyzer",
"arguments": {
"database": "postgres1",
"action": "getMetrics",
"limit": 10
}
}
// Analyze a specific query for optimization opportunities
{
"name": "dbPerformanceAnalyzer",
"arguments": {
"database": "mysql1",
"action": "analyzeQuery",
"query": "SELECT * FROM orders JOIN users ON orders.user_id = users.id WHERE orders.status = 'pending'"
}
}
// Reset all collected performance metrics
{
"name": "dbPerformanceAnalyzer",
"arguments": {
"database": "postgres1",
"action": "reset"
}
}
// Set the threshold for identifying slow queries (in milliseconds)
{
"name": "dbPerformanceAnalyzer",
"arguments": {
"database": "mysql1",
"action": "setThreshold",
"threshold": 300
}
}
```
Example response from a performance analysis:
```json
{
"query": "SELECT * FROM orders JOIN users ON orders.user_id = users.id WHERE orders.status = 'pending'",
"suggestions": [
"Avoid using SELECT * - specify only the columns you need",
"Verify that ORDER BY columns are properly indexed",
"Consider adding appropriate indexes for frequently queried columns"
]
}
```
Example response from getting slow queries:
```json
{
"metrics": [
{
"query": "SELECT * FROM large_table WHERE status = ?",
"count": 15,
"totalDuration": "2.5s",
"minDuration": "120ms",
"maxDuration": "750ms",
"avgDuration": "166ms",
"lastExecuted": "2025-06-15T14:23:45Z"
},
{
"query": "SELECT order_id, SUM(amount) FROM order_items GROUP BY order_id",
"count": 8,
"totalDuration": "1.2s",
"minDuration": "110ms",
"maxDuration": "580ms",
"avgDuration": "150ms",
"lastExecuted": "2025-06-15T14:20:12Z"
}
],
"count": 2,
"threshold": "100ms"
}
```
The Performance Analyzer automatically tracks all query executions and provides:
- Identification of slow-performing queries
- Query execution metrics (count, min, max, average durations)
- Pattern-based query analysis
- Optimization suggestions
- Performance trend monitoring
- Configurable slow query thresholds
### Database Transactions Tool
For operations that require transaction support, use the `dbTransaction` tool:
```json
// Begin a transaction
{
"name": "dbTransaction",
"arguments": {
"database": "mysql1",
"action": "begin",
"readOnly": false
}
}
// Execute a statement within the transaction
{
"name": "dbTransaction",
"arguments": {
"database": "mysql1",
"action": "execute",
"transactionId": "tx-1684785421293", // ID returned from the begin operation
"statement": "INSERT INTO orders (customer_id, amount) VALUES (?, ?)",
"params": ["123", "450.00"]
}
}
// Commit the transaction
{
"name": "dbTransaction",
"arguments": {
"database": "mysql1",
"action": "commit",
"transactionId": "tx-1684785421293"
}
}
// Rollback the transaction (in case of errors)
{
"name": "dbTransaction",
"arguments": {
"database": "mysql1",
"action": "rollback",
"transactionId": "tx-1684785421293"
}
}
```
### Editor Integration
The server includes support for editor-specific features through the `editor/context` method, enabling tools to be aware of:
- Current SQL file
- Selected query
- Cursor position
- Open database connections
- Database structure
## 🗺️ Roadmap
We're committed to expanding DB MCP Server's AI integration capabilities. Here's our planned development roadmap:
### Q2 2025
- ✅ **AI-Aware Schema Explorer** - Auto-discover database structure and relationships for AI context
- ✅ **Context-Aware Query Builder** - AI-driven SQL query construction with syntax validation
- ✅ **Performance Analyzer with AI Insights** - Identify optimization opportunities with AI recommendations
### Q3 2025
- **AI-Powered Data Visualization** - Create charts and graphs from query results with AI suggestions
- **AI-Driven Model Generator** - Auto-generate code models from database tables using AI patterns
- **Multi-DB Support Expansion with Cross-DB AI Reasoning** - Add support with AI that understands:
- **MongoDB** - Document-oriented schema for AI reasoning
- **Redis** - Key-value pattern recognition for AI
- **SQLite** - Lightweight database understanding
### Q4 2025
- **AI-Assisted Migration Manager** - Version-controlled schema changes with AI recommendations
- **Intelligent Access Control** - AI-aware permissions for database operations
- **Context-Enriched Query History** - Track queries with execution metrics for AI learning
- **Additional Database Integrations with AI Context**:
- **Cassandra** - Distributed schema understanding
- **Elasticsearch** - Search-optimized AI interactions
- **DynamoDB** - NoSQL reasoning capabilities
- **Oracle** - Enterprise schema comprehension
### Future Vision
- **Complete Database Coverage with Unified AI Context** - Support for all major databases with consistent AI interface
- **AI-Assisted Query Optimization** - Smart recommendations using machine learning
- **Cross-Database AI Operations** - Unified interface for heterogeneous database environments
- **Real-Time Collaborative AI** - Multi-user AI assistance for collaborative database work
- **AI-Powered Plugin System** - Community-driven extension marketplace with AI discovery
## 🤝 Contributing
Contributions are welcome! Here's how you can help:
1. **Fork** the repository
2. **Create** a feature branch: `git checkout -b new-feature`
3. **Commit** your changes: `git commit -am 'Add new feature'`
4. **Push** to the branch: `git push origin new-feature`
5. **Submit** a pull request
Please make sure your code follows our coding standards and includes appropriate tests.
## 📝 License
This project is licensed under the MIT License - see the LICENSE file for details.
## 📧 Support & Contact
- For questions or issues, email [[email protected]](mailto:[email protected])
- Open an issue directly: [Issue Tracker](https://github.com/FreePeak/db-mcp-server/issues)
- If DB MCP Server helps your work, please consider supporting:
<p align="">
<a href="https://www.buymeacoffee.com/linhdmn">
<img src="https://img.buymeacoffee.com/button-api/?text=Support DB MCP Server&emoji=☕&slug=linhdmn&button_colour=FFDD00&font_colour=000000&font_family=Cookie&outline_colour=000000&coffee_colour=ffffff"
alt="Buy Me A Coffee"/>
</a>
</p>
```
--------------------------------------------------------------------------------
/internal/delivery/mcp/timescale_completion.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"context"
"fmt"
)
// CompletionItem represents a code completion item
type CompletionItem struct {
Name string `json:"name"`
Type string `json:"type"`
Documentation string `json:"documentation"`
InsertText string `json:"insertText"`
Parameters []string `json:"parameters,omitempty"`
ReturnType string `json:"returnType,omitempty"`
Category string `json:"category,omitempty"`
SortText string `json:"sortText,omitempty"`
FilterText string `json:"filterText,omitempty"`
CommitCharacters []string `json:"commitCharacters,omitempty"`
}
// QuerySuggestion represents a suggested query template for TimescaleDB
type QuerySuggestion struct {
Title string `json:"title"`
Description string `json:"description"`
Query string `json:"query"`
Category string `json:"category"`
}
// TimescaleDBCompletionProvider provides code completion for TimescaleDB functions
type TimescaleDBCompletionProvider struct {
contextProvider *TimescaleDBContextProvider
}
// NewTimescaleDBCompletionProvider creates a new TimescaleDB completion provider
func NewTimescaleDBCompletionProvider() *TimescaleDBCompletionProvider {
return &TimescaleDBCompletionProvider{
contextProvider: NewTimescaleDBContextProvider(),
}
}
// GetTimeBucketCompletions returns completions for time_bucket functions
func (p *TimescaleDBCompletionProvider) GetTimeBucketCompletions(ctx context.Context, dbID string, useCase UseCaseProvider) ([]CompletionItem, error) {
// First check if TimescaleDB is available
tsdbContext, err := p.contextProvider.DetectTimescaleDB(ctx, dbID, useCase)
if err != nil {
return nil, fmt.Errorf("failed to detect TimescaleDB: %w", err)
}
if !tsdbContext.IsTimescaleDB {
return nil, fmt.Errorf("TimescaleDB is not available in the database %s", dbID)
}
// Define time bucket function completions
completions := []CompletionItem{
{
Name: "time_bucket",
Type: "function",
Documentation: "TimescaleDB function that groups time into buckets. Useful for downsampling time-series data.",
InsertText: "time_bucket($1, $2)",
Parameters: []string{"interval", "timestamp"},
ReturnType: "timestamp",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_bucket_gapfill",
Type: "function",
Documentation: "TimescaleDB function similar to time_bucket but fills in missing values (gaps) in the result.",
InsertText: "time_bucket_gapfill($1, $2)",
Parameters: []string{"interval", "timestamp"},
ReturnType: "timestamp",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_bucket_ng",
Type: "function",
Documentation: "TimescaleDB next-generation time bucket function that supports timezone-aware bucketing.",
InsertText: "time_bucket_ng('$1', $2)",
Parameters: []string{"interval", "timestamp", "timezone"},
ReturnType: "timestamp with time zone",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_bucket",
Type: "function",
Documentation: "TimescaleDB function that groups time into buckets with an offset.",
InsertText: "time_bucket($1, $2, $3)",
Parameters: []string{"interval", "timestamp", "offset"},
ReturnType: "timestamp",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}
// Add version information to documentation
for i := range completions {
completions[i].Documentation = fmt.Sprintf("TimescaleDB v%s: %s", tsdbContext.Version, completions[i].Documentation)
}
return completions, nil
}
// GetHypertableFunctionCompletions returns completions for hypertable management functions
func (p *TimescaleDBCompletionProvider) GetHypertableFunctionCompletions(ctx context.Context, dbID string, useCase UseCaseProvider) ([]CompletionItem, error) {
// First check if TimescaleDB is available
tsdbContext, err := p.contextProvider.DetectTimescaleDB(ctx, dbID, useCase)
if err != nil {
return nil, fmt.Errorf("failed to detect TimescaleDB: %w", err)
}
if !tsdbContext.IsTimescaleDB {
return nil, fmt.Errorf("TimescaleDB is not available in the database %s", dbID)
}
// Define hypertable function completions
completions := []CompletionItem{
{
Name: "create_hypertable",
Type: "function",
Documentation: "TimescaleDB function that converts a standard PostgreSQL table into a hypertable partitioned by time.",
InsertText: "create_hypertable('$1', '$2')",
Parameters: []string{"table_name", "time_column_name"},
ReturnType: "void",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_dimension",
Type: "function",
Documentation: "TimescaleDB function that adds another dimension to a hypertable for partitioning.",
InsertText: "add_dimension('$1', '$2')",
Parameters: []string{"hypertable", "column_name"},
ReturnType: "void",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_compression_policy",
Type: "function",
Documentation: "TimescaleDB function that adds an automatic compression policy to a hypertable.",
InsertText: "add_compression_policy('$1', INTERVAL '$2')",
Parameters: []string{"hypertable", "older_than"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_retention_policy",
Type: "function",
Documentation: "TimescaleDB function that adds an automatic data retention policy to a hypertable.",
InsertText: "add_retention_policy('$1', INTERVAL '$2')",
Parameters: []string{"hypertable", "drop_after"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "alter_job",
Type: "function",
Documentation: "TimescaleDB function that alters a policy job's schedule or configuration.",
InsertText: "alter_job($1, scheduled => true)",
Parameters: []string{"job_id"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "hypertable_size",
Type: "function",
Documentation: "TimescaleDB function that shows the size of a hypertable, including all chunks.",
InsertText: "hypertable_size('$1')",
Parameters: []string{"hypertable"},
ReturnType: "bigint",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "hypertable_detailed_size",
Type: "function",
Documentation: "TimescaleDB function that shows detailed size information for a hypertable.",
InsertText: "hypertable_detailed_size('$1')",
Parameters: []string{"hypertable"},
ReturnType: "table",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}
// Add version information to documentation
for i := range completions {
completions[i].Documentation = fmt.Sprintf("TimescaleDB v%s: %s", tsdbContext.Version, completions[i].Documentation)
}
return completions, nil
}
// GetContinuousAggregateFunctionCompletions returns completions for continuous aggregate functions
func (p *TimescaleDBCompletionProvider) GetContinuousAggregateFunctionCompletions(ctx context.Context, dbID string, useCase UseCaseProvider) ([]CompletionItem, error) {
// First check if TimescaleDB is available
tsdbContext, err := p.contextProvider.DetectTimescaleDB(ctx, dbID, useCase)
if err != nil {
return nil, fmt.Errorf("failed to detect TimescaleDB: %w", err)
}
if !tsdbContext.IsTimescaleDB {
return nil, fmt.Errorf("TimescaleDB is not available in the database %s", dbID)
}
// Define continuous aggregate function completions
completions := []CompletionItem{
{
Name: "create_materialized_view",
Type: "function",
Documentation: "TimescaleDB function that creates a continuous aggregate view.",
InsertText: "CREATE MATERIALIZED VIEW $1 WITH (timescaledb.continuous) AS SELECT $2 FROM $3 GROUP BY $4;",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_continuous_aggregate_policy",
Type: "function",
Documentation: "TimescaleDB function that adds a refresh policy to a continuous aggregate.",
InsertText: "add_continuous_aggregate_policy('$1', start_offset => INTERVAL '$2', end_offset => INTERVAL '$3', schedule_interval => INTERVAL '$4')",
Parameters: []string{"continuous_aggregate", "start_offset", "end_offset", "schedule_interval"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "refresh_continuous_aggregate",
Type: "function",
Documentation: "TimescaleDB function that manually refreshes a continuous aggregate for a specific time range.",
InsertText: "refresh_continuous_aggregate('$1', '$2', '$3')",
Parameters: []string{"continuous_aggregate", "start_time", "end_time"},
ReturnType: "void",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}
// Add version information to documentation
for i := range completions {
completions[i].Documentation = fmt.Sprintf("TimescaleDB v%s: %s", tsdbContext.Version, completions[i].Documentation)
}
return completions, nil
}
// GetAnalyticsFunctionCompletions returns completions for TimescaleDB's analytics functions
func (p *TimescaleDBCompletionProvider) GetAnalyticsFunctionCompletions(ctx context.Context, dbID string, useCase UseCaseProvider) ([]CompletionItem, error) {
// First check if TimescaleDB is available
tsdbContext, err := p.contextProvider.DetectTimescaleDB(ctx, dbID, useCase)
if err != nil {
return nil, fmt.Errorf("failed to detect TimescaleDB: %w", err)
}
if !tsdbContext.IsTimescaleDB {
return nil, fmt.Errorf("TimescaleDB is not available in the database %s", dbID)
}
// Define analytics function completions
completions := []CompletionItem{
{
Name: "first",
Type: "function",
Documentation: "TimescaleDB function that returns the value of the specified column at the first time ordered by time within each group.",
InsertText: "first($1, $2)",
Parameters: []string{"value", "time"},
ReturnType: "same as value",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "last",
Type: "function",
Documentation: "TimescaleDB function that returns the value of the specified column at the last time ordered by time within each group.",
InsertText: "last($1, $2)",
Parameters: []string{"value", "time"},
ReturnType: "same as value",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_weight",
Type: "function",
Documentation: "TimescaleDB function that returns the time-weighted average of a value over time.",
InsertText: "time_weight($1, $2)",
Parameters: []string{"value", "time"},
ReturnType: "double precision",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "histogram",
Type: "function",
Documentation: "TimescaleDB function that buckets values and returns a histogram showing the distribution.",
InsertText: "histogram($1, $2, $3, $4)",
Parameters: []string{"value", "min", "max", "num_buckets"},
ReturnType: "histogram",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "approx_percentile",
Type: "function",
Documentation: "TimescaleDB function that calculates approximate percentiles using the t-digest method.",
InsertText: "approx_percentile($1, $2)",
Parameters: []string{"value", "percentile"},
ReturnType: "double precision",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}
// Add version information to documentation
for i := range completions {
completions[i].Documentation = fmt.Sprintf("TimescaleDB v%s: %s", tsdbContext.Version, completions[i].Documentation)
}
return completions, nil
}
// GetAllFunctionCompletions returns completions for all TimescaleDB functions
func (p *TimescaleDBCompletionProvider) GetAllFunctionCompletions(ctx context.Context, dbID string, useCase UseCaseProvider) ([]CompletionItem, error) {
// Check if TimescaleDB is available by using the DetectTimescaleDB method
// which already checks the database type
tsdbContext, err := p.contextProvider.DetectTimescaleDB(ctx, dbID, useCase)
if err != nil {
return nil, fmt.Errorf("failed to detect TimescaleDB: %w", err)
}
if !tsdbContext.IsTimescaleDB {
return nil, fmt.Errorf("TimescaleDB is not available in the database %s", dbID)
}
// Define predefined completions for all categories
var allCompletions []CompletionItem
// Time bucket functions
allCompletions = append(allCompletions, []CompletionItem{
{
Name: "time_bucket",
Type: "function",
Documentation: "TimescaleDB function that groups time into buckets. Useful for downsampling time-series data.",
InsertText: "time_bucket($1, $2)",
Parameters: []string{"interval", "timestamp"},
ReturnType: "timestamp",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_bucket_gapfill",
Type: "function",
Documentation: "TimescaleDB function similar to time_bucket but fills in missing values (gaps) in the result.",
InsertText: "time_bucket_gapfill($1, $2)",
Parameters: []string{"interval", "timestamp"},
ReturnType: "timestamp",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_bucket_ng",
Type: "function",
Documentation: "TimescaleDB next-generation time bucket function that supports timezone-aware bucketing.",
InsertText: "time_bucket_ng('$1', $2)",
Parameters: []string{"interval", "timestamp", "timezone"},
ReturnType: "timestamp with time zone",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}...)
// Hypertable functions
allCompletions = append(allCompletions, []CompletionItem{
{
Name: "create_hypertable",
Type: "function",
Documentation: "TimescaleDB function that converts a standard PostgreSQL table into a hypertable partitioned by time.",
InsertText: "create_hypertable('$1', '$2')",
Parameters: []string{"table_name", "time_column_name"},
ReturnType: "void",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_dimension",
Type: "function",
Documentation: "TimescaleDB function that adds another dimension to a hypertable for partitioning.",
InsertText: "add_dimension('$1', '$2')",
Parameters: []string{"hypertable", "column_name"},
ReturnType: "void",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_compression_policy",
Type: "function",
Documentation: "TimescaleDB function that adds an automatic compression policy to a hypertable.",
InsertText: "add_compression_policy('$1', INTERVAL '$2')",
Parameters: []string{"hypertable", "older_than"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_retention_policy",
Type: "function",
Documentation: "TimescaleDB function that adds an automatic data retention policy to a hypertable.",
InsertText: "add_retention_policy('$1', INTERVAL '$2')",
Parameters: []string{"hypertable", "drop_after"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}...)
// Continuous aggregate functions
allCompletions = append(allCompletions, []CompletionItem{
{
Name: "create_materialized_view",
Type: "function",
Documentation: "TimescaleDB function that creates a continuous aggregate view.",
InsertText: "CREATE MATERIALIZED VIEW $1 WITH (timescaledb.continuous) AS SELECT $2 FROM $3 GROUP BY $4;",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "add_continuous_aggregate_policy",
Type: "function",
Documentation: "TimescaleDB function that adds a refresh policy to a continuous aggregate.",
InsertText: "add_continuous_aggregate_policy('$1', start_offset => INTERVAL '$2', end_offset => INTERVAL '$3', schedule_interval => INTERVAL '$4')",
Parameters: []string{"continuous_aggregate", "start_offset", "end_offset", "schedule_interval"},
ReturnType: "integer",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}...)
// Analytics functions
allCompletions = append(allCompletions, []CompletionItem{
{
Name: "first",
Type: "function",
Documentation: "TimescaleDB function that returns the value of the specified column at the first time ordered by time within each group.",
InsertText: "first($1, $2)",
Parameters: []string{"value", "time"},
ReturnType: "same as value",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "last",
Type: "function",
Documentation: "TimescaleDB function that returns the value of the specified column at the last time ordered by time within each group.",
InsertText: "last($1, $2)",
Parameters: []string{"value", "time"},
ReturnType: "same as value",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
{
Name: "time_weight",
Type: "function",
Documentation: "TimescaleDB function that returns the time-weighted average of a value over time.",
InsertText: "time_weight($1, $2)",
Parameters: []string{"value", "time"},
ReturnType: "double precision",
Category: "TimescaleDB",
CommitCharacters: []string{"("},
},
}...)
// Add version information to documentation
for i := range allCompletions {
allCompletions[i].Documentation = fmt.Sprintf("TimescaleDB v%s: %s", tsdbContext.Version, allCompletions[i].Documentation)
}
return allCompletions, nil
}
// GetQuerySuggestions returns TimescaleDB query suggestions based on the database schema
func (p *TimescaleDBCompletionProvider) GetQuerySuggestions(ctx context.Context, dbID string, useCase UseCaseProvider) ([]QuerySuggestion, error) {
// First check if TimescaleDB is available
tsdbContext, err := p.contextProvider.GetTimescaleDBContext(ctx, dbID, useCase)
if err != nil {
return nil, fmt.Errorf("failed to get TimescaleDB context: %w", err)
}
if !tsdbContext.IsTimescaleDB {
return nil, fmt.Errorf("TimescaleDB is not available in the database %s", dbID)
}
// Base suggestions that don't depend on schema
suggestions := []QuerySuggestion{
{
Title: "Basic Time Bucket Aggregation",
Description: "Groups time-series data into time buckets and calculates aggregates",
Query: "SELECT time_bucket('1 hour', time_column) AS bucket, avg(value_column), min(value_column), max(value_column) FROM table_name WHERE time_column > now() - INTERVAL '1 day' GROUP BY bucket ORDER BY bucket;",
Category: "Time Buckets",
},
{
Title: "Time Bucket with Gap Filling",
Description: "Groups time-series data with gap filling for missing values",
Query: "SELECT time_bucket_gapfill('1 hour', time_column) AS bucket, avg(value_column), min(value_column), max(value_column) FROM table_name WHERE time_column > now() - INTERVAL '1 day' AND time_column <= now() GROUP BY bucket ORDER BY bucket;",
Category: "Time Buckets",
},
{
Title: "Create Hypertable",
Description: "Converts a standard PostgreSQL table into a TimescaleDB hypertable",
Query: "SELECT create_hypertable('table_name', 'time_column');",
Category: "Hypertable Management",
},
{
Title: "Add Compression Policy",
Description: "Adds an automatic compression policy to a hypertable",
Query: "SELECT add_compression_policy('table_name', INTERVAL '7 days');",
Category: "Hypertable Management",
},
{
Title: "Add Retention Policy",
Description: "Adds an automatic data retention policy to a hypertable",
Query: "SELECT add_retention_policy('table_name', INTERVAL '30 days');",
Category: "Hypertable Management",
},
{
Title: "Create Continuous Aggregate",
Description: "Creates a materialized view that automatically maintains aggregated data",
Query: "CREATE MATERIALIZED VIEW view_name WITH (timescaledb.continuous) AS SELECT time_bucket('1 hour', time_column) as bucket, avg(value_column) FROM table_name GROUP BY bucket;",
Category: "Continuous Aggregates",
},
{
Title: "Add Continuous Aggregate Policy",
Description: "Adds a refresh policy to a continuous aggregate",
Query: "SELECT add_continuous_aggregate_policy('view_name', start_offset => INTERVAL '2 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour');",
Category: "Continuous Aggregates",
},
{
Title: "Hypertable Size",
Description: "Shows the size of a hypertable including all chunks",
Query: "SELECT * FROM hypertable_size('table_name');",
Category: "Diagnostics",
},
{
Title: "Hypertable Detailed Size",
Description: "Shows detailed size information for a hypertable",
Query: "SELECT * FROM hypertable_detailed_size('table_name');",
Category: "Diagnostics",
},
{
Title: "Compression Stats",
Description: "Shows compression statistics for a hypertable",
Query: "SELECT * FROM hypertable_compression_stats('table_name');",
Category: "Diagnostics",
},
{
Title: "Job Stats",
Description: "Shows statistics for background jobs like compression and retention policies",
Query: "SELECT * FROM timescaledb_information.jobs;",
Category: "Diagnostics",
},
}
// If we have hypertable information, use it to create tailored suggestions
if len(tsdbContext.Hypertables) > 0 {
for _, ht := range tsdbContext.Hypertables {
tableName := ht.TableName
timeColumn := ht.TimeColumn
// Skip if we don't have both table name and time column
if tableName == "" || timeColumn == "" {
continue
}
// Add schema-specific suggestions
suggestions = append(suggestions, []QuerySuggestion{
{
Title: fmt.Sprintf("Time Bucket Aggregation for %s", tableName),
Description: fmt.Sprintf("Groups data from %s table into time buckets", tableName),
Query: fmt.Sprintf("SELECT time_bucket('1 hour', %s) AS bucket, avg(value_column) FROM %s WHERE %s > now() - INTERVAL '1 day' GROUP BY bucket ORDER BY bucket;", timeColumn, tableName, timeColumn),
Category: "Time Buckets",
},
{
Title: fmt.Sprintf("Compression Policy for %s", tableName),
Description: fmt.Sprintf("Adds compression policy to %s hypertable", tableName),
Query: fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '7 days');", tableName),
Category: "Hypertable Management",
},
{
Title: fmt.Sprintf("Retention Policy for %s", tableName),
Description: fmt.Sprintf("Adds retention policy to %s hypertable", tableName),
Query: fmt.Sprintf("SELECT add_retention_policy('%s', INTERVAL '30 days');", tableName),
Category: "Hypertable Management",
},
{
Title: fmt.Sprintf("Continuous Aggregate for %s", tableName),
Description: fmt.Sprintf("Creates a continuous aggregate view for %s", tableName),
Query: fmt.Sprintf("CREATE MATERIALIZED VIEW %s_hourly WITH (timescaledb.continuous) AS SELECT time_bucket('1 hour', %s) as bucket, avg(value_column) FROM %s GROUP BY bucket;", tableName, timeColumn, tableName),
Category: "Continuous Aggregates",
},
{
Title: fmt.Sprintf("Recent Data from %s", tableName),
Description: fmt.Sprintf("Retrieves recent data from %s with time ordering", tableName),
Query: fmt.Sprintf("SELECT * FROM %s WHERE %s > now() - INTERVAL '1 day' ORDER BY %s DESC LIMIT 100;", tableName, timeColumn, timeColumn),
Category: "Data Retrieval",
},
{
Title: fmt.Sprintf("First/Last Analysis for %s", tableName),
Description: fmt.Sprintf("Uses first/last functions to analyze %s by segments", tableName),
Query: fmt.Sprintf("SELECT segment_column, first(value_column, %s), last(value_column, %s) FROM %s GROUP BY segment_column;", timeColumn, timeColumn, tableName),
Category: "Analytics",
},
}...)
}
}
return suggestions, nil
}
```
--------------------------------------------------------------------------------
/pkg/dbtools/dbtools.go:
--------------------------------------------------------------------------------
```go
package dbtools
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/FreePeak/db-mcp-server/pkg/db"
"github.com/FreePeak/db-mcp-server/pkg/logger"
"github.com/FreePeak/db-mcp-server/pkg/tools"
)
// TODO: Refactor database connection management to support connection pooling
// TODO: Add support for connection retries and circuit breaking
// TODO: Implement comprehensive metrics collection for database operations
// TODO: Consider using a context-aware connection management system
// TODO: Add support for database migrations and versioning
// TODO: Improve error handling with custom error types
// DatabaseType represents a supported database type
type DatabaseType string
const (
// MySQL database type
MySQL DatabaseType = "mysql"
// Postgres database type
Postgres DatabaseType = "postgres"
)
// Config represents database configuration
type Config struct {
ConfigFile string
Connections []ConnectionConfig
}
// ConnectionConfig represents a single database connection configuration
type ConnectionConfig struct {
ID string `json:"id"`
Type DatabaseType `json:"type"`
Host string `json:"host"`
Port int `json:"port"`
Name string `json:"name"`
User string `json:"user"`
Password string `json:"password"`
}
// MultiDBConfig represents configuration for multiple database connections
type MultiDBConfig struct {
Connections []ConnectionConfig `json:"connections"`
}
// Database connection manager (singleton)
var (
dbManager *db.Manager
)
// DatabaseConnectionInfo represents detailed information about a database connection
type DatabaseConnectionInfo struct {
ID string `json:"id"`
Type DatabaseType `json:"type"`
Host string `json:"host"`
Port int `json:"port"`
Name string `json:"name"`
Status string `json:"status"`
Latency string `json:"latency,omitempty"`
}
// InitDatabase initializes the database connections
func InitDatabase(cfg *Config) error {
// Create database manager
dbManager = db.NewDBManager()
var multiDBConfig *MultiDBConfig
// If config file is provided, load it
if cfg != nil && cfg.ConfigFile != "" {
// Read config file
configData, err := os.ReadFile(cfg.ConfigFile)
if err != nil {
logger.Warn("Warning: failed to read config file %s: %v", cfg.ConfigFile, err)
// Don't return error, try other methods
} else {
// Parse config
multiDBConfig = &MultiDBConfig{}
if err := json.Unmarshal(configData, multiDBConfig); err != nil {
logger.Warn("Warning: failed to parse config file %s: %v", cfg.ConfigFile, err)
// Don't return error, try other methods
} else {
logger.Info("Loaded database config from file: %s", cfg.ConfigFile)
// Debug logging of connection details
for i, conn := range multiDBConfig.Connections {
logger.Info("Connection [%d]: ID=%s, Type=%s, Host=%s, Port=%d, Name=%s",
i, conn.ID, conn.Type, conn.Host, conn.Port, conn.Name)
}
}
}
}
// If config was not loaded from file, try direct connections config
if multiDBConfig == nil || len(multiDBConfig.Connections) == 0 {
if cfg != nil && len(cfg.Connections) > 0 {
// Use connections from direct config
multiDBConfig = &MultiDBConfig{
Connections: cfg.Connections,
}
logger.Info("Using database connections from direct configuration")
} else {
// Try to load from environment variable
dbConfigJSON := os.Getenv("DB_CONFIG")
if dbConfigJSON != "" {
multiDBConfig = &MultiDBConfig{}
if err := json.Unmarshal([]byte(dbConfigJSON), multiDBConfig); err != nil {
logger.Warn("Warning: failed to parse DB_CONFIG environment variable: %v", err)
// Don't return error, try legacy method
} else {
logger.Info("Loaded database config from DB_CONFIG environment variable")
}
}
}
}
// If no config loaded yet, try legacy single connection from environment
if multiDBConfig == nil || len(multiDBConfig.Connections) == 0 {
// Create a single connection from environment variables
dbType := os.Getenv("DB_TYPE")
if dbType == "" {
dbType = "mysql" // Default type
}
dbHost := os.Getenv("DB_HOST")
dbPortStr := os.Getenv("DB_PORT")
dbUser := os.Getenv("DB_USER")
dbPassword := os.Getenv("DB_PASSWORD")
dbName := os.Getenv("DB_NAME")
// If we have basic connection details, create a config
if dbHost != "" && dbUser != "" {
dbPort, err := strconv.Atoi(dbPortStr)
if err != nil || dbPort == 0 {
dbPort = 3306 // Default MySQL port
}
multiDBConfig = &MultiDBConfig{
Connections: []ConnectionConfig{
{
ID: "default",
Type: DatabaseType(dbType),
Host: dbHost,
Port: dbPort,
Name: dbName,
User: dbUser,
Password: dbPassword,
},
},
}
logger.Info("Created database config from environment variables")
}
}
// If still no config, return error
if multiDBConfig == nil || len(multiDBConfig.Connections) == 0 {
return fmt.Errorf("no database configuration provided")
}
// Convert config to JSON for loading
configJSON, err := json.Marshal(multiDBConfig)
if err != nil {
return fmt.Errorf("failed to marshal database config: %w", err)
}
if err := dbManager.LoadConfig(configJSON); err != nil {
return fmt.Errorf("failed to load database config: %w", err)
}
// Connect to all databases
if err := dbManager.Connect(); err != nil {
return fmt.Errorf("failed to connect to databases: %w", err)
}
// Log connected databases
dbs := dbManager.ListDatabases()
logger.Info("Connected to %d databases: %v", len(dbs), dbs)
return nil
}
// CloseDatabase closes all database connections
func CloseDatabase() error {
if dbManager == nil {
return nil
}
return dbManager.CloseAll()
}
// GetDatabase returns a database instance by ID
func GetDatabase(id string) (db.Database, error) {
if dbManager == nil {
return nil, fmt.Errorf("database manager not initialized")
}
return dbManager.GetDatabase(id)
}
// ListDatabases returns a list of available database connections
func ListDatabases() []string {
if dbManager == nil {
return nil
}
return dbManager.ListDatabases()
}
// showConnectedDatabases returns information about all connected databases
func showConnectedDatabases(ctx context.Context, params map[string]interface{}) (interface{}, error) {
if dbManager == nil {
return nil, fmt.Errorf("database manager not initialized")
}
var connections []DatabaseConnectionInfo
dbIDs := ListDatabases()
for _, dbID := range dbIDs {
database, err := GetDatabase(dbID)
if err != nil {
continue
}
// Get connection details
connInfo := DatabaseConnectionInfo{
ID: dbID,
}
// Check connection status and measure latency
start := time.Now()
err = database.Ping(ctx)
latency := time.Since(start)
if err != nil {
connInfo.Status = "disconnected"
connInfo.Latency = "n/a"
} else {
connInfo.Status = "connected"
connInfo.Latency = latency.String()
}
connections = append(connections, connInfo)
}
return connections, nil
}
// RegisterDatabaseTools registers all database tools with the provided registry
func RegisterDatabaseTools(registry *tools.Registry) error {
// Register schema explorer tool
registry.RegisterTool(&tools.Tool{
Name: "dbSchema",
Description: "Auto-discover database structure and relationships",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"database": map[string]interface{}{
"type": "string",
"description": "Database name to explore (optional, leave empty for all databases)",
},
"component": map[string]interface{}{
"type": "string",
"description": "Component to explore (tables, columns, indices, or all)",
"enum": []string{"tables", "columns", "indices", "all"},
},
"table": map[string]interface{}{
"type": "string",
"description": "Specific table to explore (optional)",
},
},
},
Handler: handleSchemaExplorer,
})
// Register query tool
registry.RegisterTool(&tools.Tool{
Name: "dbQuery",
Description: "Execute SQL query and return results",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"query": map[string]interface{}{
"type": "string",
"description": "SQL query to execute",
},
"database": map[string]interface{}{
"type": "string",
"description": "Database ID to query (optional if only one database is configured)",
},
"params": map[string]interface{}{
"type": "array",
"description": "Parameters for the query (for prepared statements)",
"items": map[string]interface{}{
"type": "string",
},
},
"timeout": map[string]interface{}{
"type": "integer",
"description": "Query timeout in milliseconds (default: 5000)",
},
},
Required: []string{"query"},
},
Handler: handleQuery,
})
// Register execute tool
registry.RegisterTool(&tools.Tool{
Name: "dbExecute",
Description: "Execute a database statement that doesn't return results (INSERT, UPDATE, DELETE, etc.)",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"statement": map[string]interface{}{
"type": "string",
"description": "SQL statement to execute",
},
"database": map[string]interface{}{
"type": "string",
"description": "Database ID to query (optional if only one database is configured)",
},
"params": map[string]interface{}{
"type": "array",
"description": "Parameters for the statement (for prepared statements)",
"items": map[string]interface{}{
"type": "string",
},
},
"timeout": map[string]interface{}{
"type": "integer",
"description": "Statement timeout in milliseconds (default: 5000)",
},
},
Required: []string{"statement"},
},
Handler: handleExecute,
})
// Register list databases tool
registry.RegisterTool(&tools.Tool{
Name: "dbList",
Description: "List all available database connections",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"showStatus": map[string]interface{}{
"type": "boolean",
"description": "Show connection status and latency",
},
},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
// Show connection status?
showStatus, ok := params["showStatus"].(bool)
if ok && showStatus {
return showConnectedDatabases(ctx, params)
}
// Just list database IDs
return ListDatabases(), nil
},
})
// Register query builder tool
registry.RegisterTool(&tools.Tool{
Name: "dbQueryBuilder",
Description: "Build SQL queries visually",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"action": map[string]interface{}{
"type": "string",
"description": "Action to perform (build, validate, format)",
"enum": []string{"build", "validate", "format"},
},
"query": map[string]interface{}{
"type": "string",
"description": "SQL query to validate or format",
},
"database": map[string]interface{}{
"type": "string",
"description": "Database ID to use for validation",
},
"components": map[string]interface{}{
"type": "object",
"description": "Query components (for build action)",
},
},
Required: []string{"action"},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
// Just a placeholder for now
actionVal, ok := params["action"].(string)
if !ok {
return nil, fmt.Errorf("missing or invalid 'action' parameter")
}
return fmt.Sprintf("Query builder %s action not implemented yet", actionVal), nil
},
})
// Register Cursor-compatible tool handlers
// TODO: Implement or import this function
// tools.RegisterCursorCompatibleToolHandlers(registry)
return nil
}
// PingDatabase pings a database to check the connection
func PingDatabase(db *sql.DB) error {
return db.Ping()
}
// rowsToMaps converts sql.Rows to a slice of maps
func rowsToMaps(rows *sql.Rows) ([]map[string]interface{}, error) {
// Get column names
columns, err := rows.Columns()
if err != nil {
return nil, err
}
// Make a slice for the values
values := make([]interface{}, len(columns))
// Create references for the values
valueRefs := make([]interface{}, len(columns))
for i := range columns {
valueRefs[i] = &values[i]
}
// Create the slice to store results
var results []map[string]interface{}
// Fetch rows
for rows.Next() {
// Scan the result into the pointers
err := rows.Scan(valueRefs...)
if err != nil {
return nil, err
}
// Create a map for this row
result := make(map[string]interface{})
for i, column := range columns {
val := values[i]
// Handle null values
if val == nil {
result[column] = nil
continue
}
// Convert bytes to string for easier JSON serialization
if b, ok := val.([]byte); ok {
result[column] = string(b)
} else {
result[column] = val
}
}
results = append(results, result)
}
if err := rows.Err(); err != nil {
return nil, err
}
return results, nil
}
// getStringParam safely extracts a string parameter from the params map
func getStringParam(params map[string]interface{}, key string) (string, bool) {
if val, ok := params[key].(string); ok {
return val, true
}
return "", false
}
// getIntParam safely extracts an int parameter from the params map
func getIntParam(params map[string]interface{}, key string) (int, bool) {
switch v := params[key].(type) {
case int:
return v, true
case float64:
return int(v), true
case int64:
return int(v), true
case json.Number:
if i, err := v.Int64(); err == nil {
return int(i), true
}
}
return 0, false
}
// getArrayParam safely extracts an array parameter from the params map
func getArrayParam(params map[string]interface{}, key string) ([]interface{}, bool) {
if val, ok := params[key].([]interface{}); ok {
return val, true
}
return nil, false
}
// _loadConfigFromFile loads database configuration from a file (currently unused)
func _loadConfigFromFile(cfg *Config) (*db.MultiDBConfig, error) {
if cfg.ConfigFile == "" {
return nil, fmt.Errorf("no config file specified")
}
// If path is not absolute, make it absolute
absPath := cfg.ConfigFile
if !filepath.IsAbs(absPath) {
var err error
absPath, err = filepath.Abs(absPath)
if err != nil {
return nil, fmt.Errorf("failed to resolve absolute path: %w", err)
}
}
// Read configuration file
configData, err := os.ReadFile(absPath)
if err != nil {
logger.Warn("Warning: failed to read config file %s: %v", cfg.ConfigFile, err)
return nil, err
}
// Parse JSON
var dbConfig db.MultiDBConfig
if err := json.Unmarshal(configData, &dbConfig); err != nil {
logger.Warn("Warning: failed to parse config file %s: %v", cfg.ConfigFile, err)
return nil, err
}
logger.Info("Loaded database config from file: %s", cfg.ConfigFile)
// Debug logging of connection details
for i, conn := range dbConfig.Connections {
logger.Info("Connection [%d]: ID=%s, Type=%s, Host=%s, Port=%d, Name=%s",
i, conn.ID, conn.Type, conn.Host, conn.Port, conn.Name)
}
return &dbConfig, nil
}
// _getEnv gets an environment variable or returns a default value (currently unused)
func _getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}
// _getIntEnv gets an environment variable as an integer or returns a default value (currently unused)
func _getIntEnv(key string, defaultValue int) int {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
intValue, err := strconv.Atoi(value)
if err != nil {
return defaultValue
}
return intValue
}
// _loadConfigFromEnv loads database configuration from the environment (currently unused)
func _loadConfigFromEnv() (*db.MultiDBConfig, error) {
// Check if DB_CONFIG environment variable is set
dbConfigEnv := os.Getenv("DB_CONFIG")
if dbConfigEnv != "" {
var dbConfig db.MultiDBConfig
if err := json.Unmarshal([]byte(dbConfigEnv), &dbConfig); err != nil {
logger.Warn("Warning: failed to parse DB_CONFIG environment variable: %v", err)
return nil, err
}
logger.Info("Loaded database config from DB_CONFIG environment variable")
return &dbConfig, nil
}
// Create config from individual environment variables
// Load database configuration from environment variables
dbType := _getEnv("DB_TYPE", "mysql")
dbHost := _getEnv("DB_HOST", "localhost")
dbPort := _getIntEnv("DB_PORT", 3306)
dbUser := _getEnv("DB_USER", "")
dbPass := _getEnv("DB_PASSWORD", "")
dbName := _getEnv("DB_NAME", "")
// Create a default configuration with a single connection
dbConfig := &db.MultiDBConfig{
Connections: []db.DatabaseConnectionConfig{
{
ID: "default",
Type: dbType,
Host: dbHost,
Port: dbPort,
User: dbUser,
Password: dbPass,
Name: dbName,
},
},
}
logger.Info("Created database config from environment variables")
return dbConfig, nil
}
// GetDatabaseQueryTimeout returns the query timeout for a database in milliseconds
func GetDatabaseQueryTimeout(db db.Database) int {
// Get the query timeout from the database configuration
// Default to 30 seconds (30000ms) if not configured
defaultTimeout := 30000 // ms
if dbConfig, ok := db.(interface{ QueryTimeout() int }); ok {
if timeout := dbConfig.QueryTimeout(); timeout > 0 {
return timeout * 1000 // Convert from seconds to milliseconds
}
}
return defaultTimeout
}
// RegisterMCPDatabaseTools registers database tools specifically formatted for MCP compatibility
func RegisterMCPDatabaseTools(registry *tools.Registry) error {
// Get available databases
dbs := ListDatabases()
// If no databases are available, register mock tools
if len(dbs) == 0 {
return registerMCPMockTools(registry)
}
// Register MCP tools for each database
for _, dbID := range dbs {
// Register query tool for this database
registry.RegisterTool(&tools.Tool{
Name: fmt.Sprintf("query_%s", dbID),
Description: fmt.Sprintf("Execute SQL query on %s database", dbID),
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"query": map[string]interface{}{
"type": "string",
"description": "SQL query to execute",
},
"params": map[string]interface{}{
"type": "array",
"description": "Query parameters",
"items": map[string]interface{}{
"type": "string",
},
},
},
Required: []string{"query"},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
return handleQueryForDatabase(ctx, params, dbID)
},
})
// Register execute tool for this database
registry.RegisterTool(&tools.Tool{
Name: fmt.Sprintf("execute_%s", dbID),
Description: fmt.Sprintf("Execute SQL statement on %s database", dbID),
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"statement": map[string]interface{}{
"type": "string",
"description": "SQL statement to execute",
},
"params": map[string]interface{}{
"type": "array",
"description": "Statement parameters",
"items": map[string]interface{}{
"type": "string",
},
},
},
Required: []string{"statement"},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
return handleExecuteForDatabase(ctx, params, dbID)
},
})
// Register transaction tool for this database
registry.RegisterTool(&tools.Tool{
Name: fmt.Sprintf("transaction_%s", dbID),
Description: fmt.Sprintf("Manage transactions on %s database", dbID),
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"action": map[string]interface{}{
"type": "string",
"description": "Transaction action (begin, commit, rollback, execute)",
"enum": []string{"begin", "commit", "rollback", "execute"},
},
"transactionId": map[string]interface{}{
"type": "string",
"description": "Transaction ID (required for commit, rollback, execute)",
},
"statement": map[string]interface{}{
"type": "string",
"description": "SQL statement to execute within transaction (required for execute)",
},
"params": map[string]interface{}{
"type": "array",
"description": "Statement parameters",
"items": map[string]interface{}{
"type": "string",
},
},
"readOnly": map[string]interface{}{
"type": "boolean",
"description": "Whether the transaction is read-only (for begin)",
},
},
Required: []string{"action"},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
return handleTransactionForDatabase(ctx, params, dbID)
},
})
// Register performance tool for this database
registry.RegisterTool(&tools.Tool{
Name: fmt.Sprintf("performance_%s", dbID),
Description: fmt.Sprintf("Analyze query performance on %s database", dbID),
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"action": map[string]interface{}{
"type": "string",
"description": "Action (getSlowQueries, getMetrics, analyzeQuery, reset, setThreshold)",
"enum": []string{"getSlowQueries", "getMetrics", "analyzeQuery", "reset", "setThreshold"},
},
"query": map[string]interface{}{
"type": "string",
"description": "SQL query to analyze (required for analyzeQuery)",
},
"threshold": map[string]interface{}{
"type": "number",
"description": "Slow query threshold in milliseconds (required for setThreshold)",
},
"limit": map[string]interface{}{
"type": "number",
"description": "Maximum number of results to return",
},
},
Required: []string{"action"},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
return handlePerformanceForDatabase(ctx, params, dbID)
},
})
// Register schema tool for this database
registry.RegisterTool(&tools.Tool{
Name: fmt.Sprintf("schema_%s", dbID),
Description: fmt.Sprintf("Get schema of on %s database", dbID),
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"random_string": map[string]interface{}{
"type": "string",
"description": "Dummy parameter (optional)",
},
},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
return handleSchemaForDatabase(ctx, params, dbID)
},
})
}
// Register list_databases tool
registry.RegisterTool(&tools.Tool{
Name: "list_databases",
Description: "List all available databases on database",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"random_string": map[string]interface{}{
"type": "string",
"description": "Dummy parameter (optional)",
},
},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
dbs := ListDatabases()
output := "Available databases:\n\n"
for i, db := range dbs {
output += fmt.Sprintf("%d. %s\n", i+1, db)
}
if len(dbs) == 0 {
output += "No databases configured.\n"
}
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": output},
},
}, nil
},
})
return nil
}
// Helper function to create mock tools for MCP compatibility
func registerMCPMockTools(registry *tools.Registry) error {
mockDBID := "mock"
// Register mock query tool
registry.RegisterTool(&tools.Tool{
Name: fmt.Sprintf("query_%s", mockDBID),
Description: fmt.Sprintf("Execute SQL query on %s database", mockDBID),
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"query": map[string]interface{}{
"type": "string",
"description": "SQL query to execute",
},
"params": map[string]interface{}{
"type": "array",
"description": "Query parameters",
"items": map[string]interface{}{
"type": "string",
},
},
},
Required: []string{"query"},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
query, _ := getStringParam(params, "query")
return map[string]interface{}{
"content": []map[string]interface{}{
{
"type": "text",
"text": fmt.Sprintf("Mock query executed:\n%s\n\nThis is a mock response.", query),
},
},
"mock": true,
}, nil
},
})
// Register list_databases tool
registry.RegisterTool(&tools.Tool{
Name: "list_databases",
Description: "List all available databases on database",
InputSchema: tools.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"random_string": map[string]interface{}{
"type": "string",
"description": "Dummy parameter (optional)",
},
},
},
Handler: func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
return map[string]interface{}{
"content": []map[string]interface{}{
{
"type": "text",
"text": "Available databases:\n\n1. mock (not connected)\n",
},
},
"mock": true,
}, nil
},
})
return nil
}
// Handler functions for specific databases
func handleQueryForDatabase(ctx context.Context, params map[string]interface{}, dbID string) (interface{}, error) {
query, _ := getStringParam(params, "query")
paramList, hasParams := getArrayParam(params, "params")
var queryParams []interface{}
if hasParams {
queryParams = paramList
}
result, err := executeQueryWithParams(ctx, dbID, query, queryParams)
if err != nil {
return createErrorResponse(fmt.Sprintf("Error executing query on %s: %v", dbID, err)), nil
}
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": fmt.Sprintf("Results:\n\n%s", result)},
},
}, nil
}
func handleExecuteForDatabase(ctx context.Context, params map[string]interface{}, dbID string) (interface{}, error) {
statement, _ := getStringParam(params, "statement")
paramList, hasParams := getArrayParam(params, "params")
var stmtParams []interface{}
if hasParams {
stmtParams = paramList
}
result, err := executeStatementWithParams(ctx, dbID, statement, stmtParams)
if err != nil {
return createErrorResponse(fmt.Sprintf("Error executing statement on %s: %v", dbID, err)), nil
}
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": result},
},
}, nil
}
func handleTransactionForDatabase(ctx context.Context, params map[string]interface{}, dbID string) (interface{}, error) {
action, _ := getStringParam(params, "action")
txID, hasTxID := getStringParam(params, "transactionId")
statement, hasStatement := getStringParam(params, "statement")
// Fix: properly handle type assertion
readOnly := false
if val, ok := params["readOnly"].(bool); ok {
readOnly = val
}
paramList, hasParams := getArrayParam(params, "params")
var stmtParams []interface{}
if hasParams {
stmtParams = paramList
}
switch action {
case "begin":
// Generate transaction ID if not provided
if !hasTxID {
txID = fmt.Sprintf("tx_%s_%d", dbID, time.Now().Unix())
}
// Start transaction
db, err := GetDatabase(dbID)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to get database %s: %v", dbID, err)), nil
}
// Set read-only option if specified
var opts *sql.TxOptions
if readOnly {
opts = &sql.TxOptions{ReadOnly: true}
}
tx, err := db.BeginTx(ctx, opts)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to begin transaction: %v", err)), nil
}
// Store transaction
if err := storeTransaction(txID, tx); err != nil {
return createErrorResponse(fmt.Sprintf("Failed to store transaction: %v", err)), nil
}
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": "Transaction started"},
},
"metadata": map[string]interface{}{
"transactionId": txID,
},
}, nil
case "commit":
if !hasTxID {
return createErrorResponse("transactionId is required for commit action"), nil
}
tx, err := getTransaction(txID)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to get transaction %s: %v", txID, err)), nil
}
if err := tx.Commit(); err != nil {
return createErrorResponse(fmt.Sprintf("Failed to commit transaction: %v", err)), nil
}
// Remove transaction
removeTransaction(txID)
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": "Transaction committed"},
},
}, nil
case "rollback":
if !hasTxID {
return createErrorResponse("transactionId is required for rollback action"), nil
}
tx, err := getTransaction(txID)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to get transaction %s: %v", txID, err)), nil
}
if err := tx.Rollback(); err != nil {
return createErrorResponse(fmt.Sprintf("Failed to rollback transaction: %v", err)), nil
}
// Remove transaction
removeTransaction(txID)
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": "Transaction rolled back"},
},
}, nil
case "execute":
if !hasTxID {
return createErrorResponse("transactionId is required for execute action"), nil
}
if !hasStatement {
return createErrorResponse("statement is required for execute action"), nil
}
tx, err := getTransaction(txID)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to get transaction %s: %v", txID, err)), nil
}
// Execute statement
_, err = tx.Exec(statement, stmtParamsToInterfaceSlice(stmtParams)...)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to execute statement in transaction: %v", err)), nil
}
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": "Statement executed in transaction"},
},
}, nil
default:
return createErrorResponse(fmt.Sprintf("Unknown transaction action: %s", action)), nil
}
}
func handlePerformanceForDatabase(ctx context.Context, params map[string]interface{}, dbID string) (interface{}, error) {
action, _ := getStringParam(params, "action")
// Create response with basic info about the action
limitVal, hasLimit := params["limit"].(float64)
limit := 10
if hasLimit {
limit = int(limitVal)
}
// Return a basic mock response for the performance action
return map[string]interface{}{
"content": []map[string]interface{}{
{
"type": "text",
"text": fmt.Sprintf("Performance analysis for action '%s' on database '%s'\nLimit: %d\n", action, dbID, limit),
},
},
}, nil
}
func handleSchemaForDatabase(ctx context.Context, params map[string]interface{}, dbID string) (interface{}, error) {
// Try to get database schema
db, err := GetDatabase(dbID)
if err != nil {
return createErrorResponse(fmt.Sprintf("Failed to get database %s: %v", dbID, err)), nil
}
// Get database type for more accurate schema reporting
var dbType string
switch db.DriverName() {
case "mysql":
dbType = "mysql"
case "postgres":
dbType = "postgres"
default:
dbType = "unknown"
}
// Get schema information
schema := getBasicSchemaInfo(ctx, db, dbID, dbType)
return map[string]interface{}{
"content": []map[string]interface{}{
{
"type": "text",
"text": fmt.Sprintf("Database Schema for %s:\n\n%v", dbID, schema),
},
},
}, nil
}
// Helper function to get basic schema info
func getBasicSchemaInfo(ctx context.Context, db db.Database, dbID, dbType string) map[string]interface{} {
result := map[string]interface{}{
"database": dbID,
"dbType": dbType,
"tables": []map[string]string{},
}
// Try to get table list - a simple query that should work on most databases
var query string
switch dbType {
case "mysql":
query = "SHOW TABLES"
case "postgres":
query = "SELECT tablename AS TABLE_NAME FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema')"
default:
// Generic query that might work
query = "SELECT name FROM sqlite_master WHERE type='table'"
}
rows, err := db.Query(ctx, query)
if err != nil {
// Return empty schema if query fails
return result
}
defer func() {
if cerr := rows.Close(); cerr != nil {
logger.Warn("Error closing rows: %v", cerr)
}
}()
tables := []map[string]string{}
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {
continue
}
tables = append(tables, map[string]string{"TABLE_NAME": tableName})
}
result["tables"] = tables
return result
}
// Helper functions for parameter conversion
func stmtParamsToInterfaceSlice(params []interface{}) []interface{} {
result := make([]interface{}, len(params))
copy(result, params)
return result
}
func createErrorResponse(message string) map[string]interface{} {
return map[string]interface{}{
"content": []map[string]interface{}{
{"type": "text", "text": fmt.Sprintf("Error: %s", message)},
},
"isError": true,
}
}
// executeQueryWithParams executes a query with the given parameters
func executeQueryWithParams(ctx context.Context, dbID, query string, params []interface{}) (string, error) {
db, err := GetDatabase(dbID)
if err != nil {
return "", fmt.Errorf("failed to get database %s: %w", dbID, err)
}
rows, err := db.Query(ctx, query, params...)
if err != nil {
return "", fmt.Errorf("failed to execute query: %w", err)
}
defer func() {
if cerr := rows.Close(); cerr != nil {
logger.Warn("Error closing rows: %v", cerr)
}
}()
// Convert rows to string representation
result, err := formatRows(rows)
if err != nil {
return "", fmt.Errorf("failed to format rows: %w", err)
}
return result, nil
}
// executeStatementWithParams executes a statement with the given parameters
func executeStatementWithParams(ctx context.Context, dbID, statement string, params []interface{}) (string, error) {
db, err := GetDatabase(dbID)
if err != nil {
return "", fmt.Errorf("failed to get database %s: %w", dbID, err)
}
result, err := db.Exec(ctx, statement, params...)
if err != nil {
return "", fmt.Errorf("failed to execute statement: %w", err)
}
// Get affected rows
rowsAffected, err := result.RowsAffected()
if err != nil {
rowsAffected = 0
}
// Get last insert ID (might not be supported by all databases)
lastInsertID, err := result.LastInsertId()
if err != nil {
lastInsertID = 0
}
return fmt.Sprintf("Statement executed successfully.\nRows affected: %d\nLast insert ID: %d", rowsAffected, lastInsertID), nil
}
// storeTransaction stores a transaction with the given ID
func storeTransaction(id string, tx *sql.Tx) error {
// Check if transaction already exists
_, exists := GetTransaction(id)
if exists {
return fmt.Errorf("transaction with ID %s already exists", id)
}
StoreTransaction(id, tx)
return nil
}
// getTransaction retrieves a transaction by ID
func getTransaction(id string) (*sql.Tx, error) {
tx, exists := GetTransaction(id)
if !exists {
return nil, fmt.Errorf("transaction with ID %s not found", id)
}
return tx, nil
}
// removeTransaction removes a transaction from storage
func removeTransaction(id string) {
RemoveTransaction(id)
}
// formatRows formats SQL rows as a string table
func formatRows(rows *sql.Rows) (string, error) {
// Get column names
columns, err := rows.Columns()
if err != nil {
return "", err
}
// Prepare column value holders
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range columns {
valuePtrs[i] = &values[i]
}
// Build header
var sb strings.Builder
for i, col := range columns {
if i > 0 {
sb.WriteString("\t")
}
sb.WriteString(col)
}
sb.WriteString("\n")
// Add separator
sb.WriteString(strings.Repeat("-", 80))
sb.WriteString("\n")
// Process rows
rowCount := 0
for rows.Next() {
rowCount++
if err := rows.Scan(valuePtrs...); err != nil {
return "", err
}
// Format row values
for i, val := range values {
if i > 0 {
sb.WriteString("\t")
}
sb.WriteString(formatValue(val))
}
sb.WriteString("\n")
}
if err := rows.Err(); err != nil {
return "", err
}
// Add total row count
sb.WriteString(fmt.Sprintf("\nTotal rows: %d", rowCount))
return sb.String(), nil
}
// formatValue converts a value to string representation
func formatValue(val interface{}) string {
if val == nil {
return "NULL"
}
switch v := val.(type) {
case []byte:
return string(v)
case time.Time:
return v.String()
default:
return fmt.Sprintf("%v", v)
}
}
```
--------------------------------------------------------------------------------
/internal/delivery/mcp/timescale_tool.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/FreePeak/cortex/pkg/server"
cortextools "github.com/FreePeak/cortex/pkg/tools"
)
// TimescaleDBTool implements a tool for TimescaleDB operations
type TimescaleDBTool struct {
name string
description string
}
// NewTimescaleDBTool creates a new TimescaleDB tool
func NewTimescaleDBTool() *TimescaleDBTool {
return &TimescaleDBTool{
name: "timescaledb",
description: "Perform TimescaleDB operations",
}
}
// GetName returns the name of the tool
func (t *TimescaleDBTool) GetName() string {
return t.name
}
// GetDescription returns the description of the tool
func (t *TimescaleDBTool) GetDescription(dbID string) string {
if dbID == "" {
return t.description
}
return fmt.Sprintf("%s on %s", t.description, dbID)
}
// CreateTool creates the TimescaleDB tool
func (t *TimescaleDBTool) CreateTool(name string, dbID string) interface{} {
// Create main tool that describes the available operations
mainTool := cortextools.NewTool(
name,
cortextools.WithDescription(t.GetDescription(dbID)),
cortextools.WithString("operation",
cortextools.Description("TimescaleDB operation to perform"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The table to perform the operation on"),
),
)
return mainTool
}
// CreateHypertableTool creates a specific tool for hypertable creation
func (t *TimescaleDBTool) CreateHypertableTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Create TimescaleDB hypertable on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'create_hypertable'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The table to convert to a hypertable"),
cortextools.Required(),
),
cortextools.WithString("time_column",
cortextools.Description("The timestamp column for the hypertable"),
cortextools.Required(),
),
cortextools.WithString("chunk_time_interval",
cortextools.Description("Time interval for chunks (e.g., '1 day')"),
),
cortextools.WithString("partitioning_column",
cortextools.Description("Optional column for space partitioning"),
),
cortextools.WithBoolean("if_not_exists",
cortextools.Description("Skip if hypertable already exists"),
),
)
}
// CreateListHypertablesTool creates a specific tool for listing hypertables
func (t *TimescaleDBTool) CreateListHypertablesTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("List TimescaleDB hypertables on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'list_hypertables'"),
cortextools.Required(),
),
)
}
// CreateCompressionEnableTool creates a tool for enabling compression on a hypertable
func (t *TimescaleDBTool) CreateCompressionEnableTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Enable compression on TimescaleDB hypertable on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'enable_compression'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The hypertable to enable compression on"),
cortextools.Required(),
),
cortextools.WithString("after",
cortextools.Description("Time interval after which to compress chunks (e.g., '7 days')"),
),
)
}
// CreateCompressionDisableTool creates a tool for disabling compression on a hypertable
func (t *TimescaleDBTool) CreateCompressionDisableTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Disable compression on TimescaleDB hypertable on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'disable_compression'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The hypertable to disable compression on"),
cortextools.Required(),
),
)
}
// CreateCompressionPolicyAddTool creates a tool for adding a compression policy
func (t *TimescaleDBTool) CreateCompressionPolicyAddTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Add compression policy to TimescaleDB hypertable on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'add_compression_policy'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The hypertable to add compression policy to"),
cortextools.Required(),
),
cortextools.WithString("interval",
cortextools.Description("Time interval after which to compress chunks (e.g., '30 days')"),
cortextools.Required(),
),
cortextools.WithString("segment_by",
cortextools.Description("Column to use for segmenting data during compression"),
),
cortextools.WithString("order_by",
cortextools.Description("Column(s) to use for ordering data during compression"),
),
)
}
// CreateCompressionPolicyRemoveTool creates a tool for removing a compression policy
func (t *TimescaleDBTool) CreateCompressionPolicyRemoveTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Remove compression policy from TimescaleDB hypertable on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'remove_compression_policy'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The hypertable to remove compression policy from"),
cortextools.Required(),
),
)
}
// CreateCompressionSettingsTool creates a tool for retrieving compression settings
func (t *TimescaleDBTool) CreateCompressionSettingsTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Get compression settings for TimescaleDB hypertable on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'get_compression_settings'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The hypertable to get compression settings for"),
cortextools.Required(),
),
)
}
// CreateRetentionPolicyTool creates a specific tool for managing retention policies
func (t *TimescaleDBTool) CreateRetentionPolicyTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Manage TimescaleDB retention policies on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be one of: add_retention_policy, remove_retention_policy, get_retention_policy"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The hypertable to manage retention policy for"),
cortextools.Required(),
),
cortextools.WithString("retention_interval",
cortextools.Description("Time interval for data retention (e.g., '30 days', '6 months')"),
),
)
}
// CreateTimeSeriesQueryTool creates a specific tool for time-series queries
func (t *TimescaleDBTool) CreateTimeSeriesQueryTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Execute time-series queries on TimescaleDB %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'time_series_query'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The table to query"),
cortextools.Required(),
),
cortextools.WithString("time_column",
cortextools.Description("The timestamp column for time bucketing"),
cortextools.Required(),
),
cortextools.WithString("bucket_interval",
cortextools.Description("Time bucket interval (e.g., '1 hour', '1 day')"),
cortextools.Required(),
),
cortextools.WithString("start_time",
cortextools.Description("Start of time range (e.g., '2023-01-01')"),
),
cortextools.WithString("end_time",
cortextools.Description("End of time range (e.g., '2023-01-31')"),
),
cortextools.WithString("aggregations",
cortextools.Description("Comma-separated list of aggregations (e.g., 'AVG(temp),MAX(temp),COUNT(*)')"),
),
cortextools.WithString("where_condition",
cortextools.Description("Additional WHERE conditions"),
),
cortextools.WithString("group_by",
cortextools.Description("Additional GROUP BY columns (comma-separated)"),
),
cortextools.WithString("order_by",
cortextools.Description("Order by clause (default: time_bucket)"),
),
cortextools.WithString("window_functions",
cortextools.Description("Window functions to include (e.g. 'LAG(value) OVER (ORDER BY time_bucket) AS prev_value')"),
),
cortextools.WithString("limit",
cortextools.Description("Maximum number of rows to return"),
),
cortextools.WithBoolean("format_pretty",
cortextools.Description("Whether to format the response in a more readable way"),
),
)
}
// CreateTimeSeriesAnalyzeTool creates a specific tool for analyzing time-series data
func (t *TimescaleDBTool) CreateTimeSeriesAnalyzeTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Analyze time-series data patterns on TimescaleDB %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'analyze_time_series'"),
cortextools.Required(),
),
cortextools.WithString("target_table",
cortextools.Description("The table to analyze"),
cortextools.Required(),
),
cortextools.WithString("time_column",
cortextools.Description("The timestamp column"),
cortextools.Required(),
),
cortextools.WithString("start_time",
cortextools.Description("Start of time range (e.g., '2023-01-01')"),
),
cortextools.WithString("end_time",
cortextools.Description("End of time range (e.g., '2023-01-31')"),
),
)
}
// CreateContinuousAggregateTool creates a specific tool for creating continuous aggregates
func (t *TimescaleDBTool) CreateContinuousAggregateTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Create TimescaleDB continuous aggregate on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'create_continuous_aggregate'"),
cortextools.Required(),
),
cortextools.WithString("view_name",
cortextools.Description("Name for the continuous aggregate view"),
cortextools.Required(),
),
cortextools.WithString("source_table",
cortextools.Description("Source table with raw data"),
cortextools.Required(),
),
cortextools.WithString("time_column",
cortextools.Description("Time column to bucket"),
cortextools.Required(),
),
cortextools.WithString("bucket_interval",
cortextools.Description("Time bucket interval (e.g., '1 hour', '1 day')"),
cortextools.Required(),
),
cortextools.WithString("aggregations",
cortextools.Description("Comma-separated list of aggregations (e.g., 'AVG(temp),MAX(temp),COUNT(*)')"),
),
cortextools.WithString("where_condition",
cortextools.Description("WHERE condition to filter source data"),
),
cortextools.WithBoolean("with_data",
cortextools.Description("Whether to materialize data immediately"),
),
cortextools.WithBoolean("refresh_policy",
cortextools.Description("Whether to add a refresh policy"),
),
cortextools.WithString("refresh_interval",
cortextools.Description("Refresh interval (e.g., '1 day')"),
),
)
}
// CreateContinuousAggregateRefreshTool creates a specific tool for refreshing continuous aggregates
func (t *TimescaleDBTool) CreateContinuousAggregateRefreshTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Refresh TimescaleDB continuous aggregate on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'refresh_continuous_aggregate'"),
cortextools.Required(),
),
cortextools.WithString("view_name",
cortextools.Description("Name of the continuous aggregate view"),
cortextools.Required(),
),
cortextools.WithString("start_time",
cortextools.Description("Start of time range to refresh (e.g., '2023-01-01')"),
),
cortextools.WithString("end_time",
cortextools.Description("End of time range to refresh (e.g., '2023-01-31')"),
),
)
}
// CreateContinuousAggregateDropTool creates a specific tool for dropping continuous aggregates
func (t *TimescaleDBTool) CreateContinuousAggregateDropTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Drop TimescaleDB continuous aggregate on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'drop_continuous_aggregate'"),
cortextools.Required(),
),
cortextools.WithString("view_name",
cortextools.Description("Name of the continuous aggregate view to drop"),
cortextools.Required(),
),
cortextools.WithBoolean("cascade",
cortextools.Description("Whether to drop dependent objects as well"),
),
)
}
// CreateContinuousAggregateListTool creates a specific tool for listing continuous aggregates
func (t *TimescaleDBTool) CreateContinuousAggregateListTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("List TimescaleDB continuous aggregates on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'list_continuous_aggregates'"),
cortextools.Required(),
),
)
}
// CreateContinuousAggregateInfoTool creates a specific tool for getting continuous aggregate information
func (t *TimescaleDBTool) CreateContinuousAggregateInfoTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Get information about a TimescaleDB continuous aggregate on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'get_continuous_aggregate_info'"),
cortextools.Required(),
),
cortextools.WithString("view_name",
cortextools.Description("Name of the continuous aggregate view"),
cortextools.Required(),
),
)
}
// CreateContinuousAggregatePolicyAddTool creates a specific tool for adding a refresh policy
func (t *TimescaleDBTool) CreateContinuousAggregatePolicyAddTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Add refresh policy to TimescaleDB continuous aggregate on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'add_continuous_aggregate_policy'"),
cortextools.Required(),
),
cortextools.WithString("view_name",
cortextools.Description("Name of the continuous aggregate view"),
cortextools.Required(),
),
cortextools.WithString("start_offset",
cortextools.Description("How far to look back for data to refresh (e.g., '1 week')"),
cortextools.Required(),
),
cortextools.WithString("end_offset",
cortextools.Description("How recent of data to refresh (e.g., '1 hour')"),
cortextools.Required(),
),
cortextools.WithString("schedule_interval",
cortextools.Description("How often to refresh data (e.g., '1 day')"),
cortextools.Required(),
),
)
}
// CreateContinuousAggregatePolicyRemoveTool creates a specific tool for removing a refresh policy
func (t *TimescaleDBTool) CreateContinuousAggregatePolicyRemoveTool(name string, dbID string) interface{} {
return cortextools.NewTool(
name,
cortextools.WithDescription(fmt.Sprintf("Remove refresh policy from TimescaleDB continuous aggregate on %s", dbID)),
cortextools.WithString("operation",
cortextools.Description("The operation must be 'remove_continuous_aggregate_policy'"),
cortextools.Required(),
),
cortextools.WithString("view_name",
cortextools.Description("Name of the continuous aggregate view"),
cortextools.Required(),
),
)
}
// HandleRequest handles a tool request
func (t *TimescaleDBTool) HandleRequest(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract parameters from the request
if request.Parameters == nil {
return nil, fmt.Errorf("missing parameters")
}
operation, ok := request.Parameters["operation"].(string)
if !ok || operation == "" {
return nil, fmt.Errorf("operation parameter is required")
}
// Route to the appropriate handler based on the operation
switch strings.ToLower(operation) {
case "create_hypertable":
return t.handleCreateHypertable(ctx, request, dbID, useCase)
case "list_hypertables":
return t.handleListHypertables(ctx, request, dbID, useCase)
case "enable_compression":
return t.handleEnableCompression(ctx, request, dbID, useCase)
case "disable_compression":
return t.handleDisableCompression(ctx, request, dbID, useCase)
case "add_compression_policy":
return t.handleAddCompressionPolicy(ctx, request, dbID, useCase)
case "remove_compression_policy":
return t.handleRemoveCompressionPolicy(ctx, request, dbID, useCase)
case "get_compression_settings":
return t.handleGetCompressionSettings(ctx, request, dbID, useCase)
case "add_retention_policy":
return t.handleAddRetentionPolicy(ctx, request, dbID, useCase)
case "remove_retention_policy":
return t.handleRemoveRetentionPolicy(ctx, request, dbID, useCase)
case "get_retention_policy":
return t.handleGetRetentionPolicy(ctx, request, dbID, useCase)
case "time_series_query":
return t.handleTimeSeriesQuery(ctx, request, dbID, useCase)
case "analyze_time_series":
return t.handleTimeSeriesAnalyze(ctx, request, dbID, useCase)
case "create_continuous_aggregate":
return t.handleCreateContinuousAggregate(ctx, request, dbID, useCase)
case "refresh_continuous_aggregate":
return t.handleRefreshContinuousAggregate(ctx, request, dbID, useCase)
case "drop_continuous_aggregate":
return t.handleDropContinuousAggregate(ctx, request, dbID, useCase)
case "list_continuous_aggregates":
return t.handleListContinuousAggregates(ctx, request, dbID, useCase)
case "get_continuous_aggregate_info":
return t.handleGetContinuousAggregateInfo(ctx, request, dbID, useCase)
case "add_continuous_aggregate_policy":
return t.handleAddContinuousAggregatePolicy(ctx, request, dbID, useCase)
case "remove_continuous_aggregate_policy":
return t.handleRemoveContinuousAggregatePolicy(ctx, request, dbID, useCase)
default:
return map[string]interface{}{"message": fmt.Sprintf("Operation '%s' not implemented yet", operation)}, nil
}
}
// handleCreateHypertable handles the create_hypertable operation
func (t *TimescaleDBTool) handleCreateHypertable(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
timeColumn, ok := request.Parameters["time_column"].(string)
if !ok || timeColumn == "" {
return nil, fmt.Errorf("time_column parameter is required")
}
// Extract optional parameters
chunkTimeInterval := getStringParam(request.Parameters, "chunk_time_interval")
partitioningColumn := getStringParam(request.Parameters, "partitioning_column")
ifNotExists := getBoolParam(request.Parameters, "if_not_exists")
// Build the SQL statement to create a hypertable
sql := buildCreateHypertableSQL(targetTable, timeColumn, chunkTimeInterval, partitioningColumn, ifNotExists)
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Execute the statement
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to create hypertable: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully created hypertable '%s' with time column '%s'", targetTable, timeColumn),
"details": result,
}, nil
}
// handleListHypertables handles the list_hypertables operation
func (t *TimescaleDBTool) handleListHypertables(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL query to list hypertables
sql := `
SELECT h.table_name, h.schema_name, d.column_name as time_column,
count(d.id) as num_dimensions,
(
SELECT column_name FROM _timescaledb_catalog.dimension
WHERE hypertable_id = h.id AND column_type != 'TIMESTAMP'
AND column_type != 'TIMESTAMPTZ'
LIMIT 1
) as space_column
FROM _timescaledb_catalog.hypertable h
JOIN _timescaledb_catalog.dimension d ON h.id = d.hypertable_id
GROUP BY h.id, h.table_name, h.schema_name
`
// Execute the statement
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to list hypertables: %w", err)
}
return map[string]interface{}{
"message": "Successfully retrieved hypertables list",
"details": result,
}, nil
}
// handleEnableCompression handles the enable_compression operation
func (t *TimescaleDBTool) handleEnableCompression(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
// Extract optional interval parameter
afterInterval := getStringParam(request.Parameters, "after")
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL statement to enable compression
sql := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress = true)", targetTable)
// Execute the statement
_, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to enable compression: %w", err)
}
var message string
// If interval is specified, add compression policy
if afterInterval != "" {
// Build the SQL statement for compression policy
policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s')", targetTable, afterInterval)
// Execute the statement
_, err = useCase.ExecuteStatement(ctx, dbID, policySQL, nil)
if err != nil {
return nil, fmt.Errorf("failed to add compression policy: %w", err)
}
message = fmt.Sprintf("Successfully enabled compression on hypertable '%s' with automatic compression after '%s'", targetTable, afterInterval)
} else {
message = fmt.Sprintf("Successfully enabled compression on hypertable '%s'", targetTable)
}
return map[string]interface{}{
"message": message,
}, nil
}
// handleDisableCompression handles the disable_compression operation
func (t *TimescaleDBTool) handleDisableCompression(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// First, find and remove any existing compression policy
policyQuery := fmt.Sprintf(
"SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_compression'",
targetTable,
)
policyResult, err := useCase.ExecuteStatement(ctx, dbID, policyQuery, nil)
if err != nil {
return nil, fmt.Errorf("failed to check for existing compression policy: %w", err)
}
// Check if a policy exists and remove it
if policyResult != "" && policyResult != "[]" {
// Parse the JSON result
var policies []map[string]interface{}
if err := json.Unmarshal([]byte(policyResult), &policies); err != nil {
return nil, fmt.Errorf("failed to parse policy result: %w", err)
}
if len(policies) > 0 && policies[0]["job_id"] != nil {
// Remove the policy
jobID := policies[0]["job_id"]
removePolicyQuery := fmt.Sprintf("SELECT remove_compression_policy(%v)", jobID)
_, err = useCase.ExecuteStatement(ctx, dbID, removePolicyQuery, nil)
if err != nil {
return nil, fmt.Errorf("failed to remove compression policy: %w", err)
}
}
}
// Build the SQL statement to disable compression
sql := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress = false)", targetTable)
// Execute the statement
_, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to disable compression: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully disabled compression on hypertable '%s'", targetTable),
}, nil
}
// handleAddCompressionPolicy handles the add_compression_policy operation
func (t *TimescaleDBTool) handleAddCompressionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
interval, ok := request.Parameters["interval"].(string)
if !ok || interval == "" {
return nil, fmt.Errorf("interval parameter is required")
}
// Extract optional parameters
segmentBy := getStringParam(request.Parameters, "segment_by")
orderBy := getStringParam(request.Parameters, "order_by")
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// First, check if compression is enabled
compressionQuery := fmt.Sprintf(
"SELECT compress FROM timescaledb_information.hypertables WHERE hypertable_name = '%s'",
targetTable,
)
compressionResult, err := useCase.ExecuteStatement(ctx, dbID, compressionQuery, nil)
if err != nil {
return nil, fmt.Errorf("failed to check compression status: %w", err)
}
// Parse the result to check if compression is enabled
var hypertables []map[string]interface{}
if err := json.Unmarshal([]byte(compressionResult), &hypertables); err != nil {
return nil, fmt.Errorf("failed to parse hypertable info: %w", err)
}
if len(hypertables) == 0 {
return nil, fmt.Errorf("table '%s' is not a hypertable", targetTable)
}
isCompressed := false
if compress, ok := hypertables[0]["compress"]; ok && compress != nil {
isCompressed = fmt.Sprintf("%v", compress) == "true"
}
// If compression isn't enabled, enable it first
if !isCompressed {
enableSQL := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress = true)", targetTable)
_, err = useCase.ExecuteStatement(ctx, dbID, enableSQL, nil)
if err != nil {
return nil, fmt.Errorf("failed to enable compression: %w", err)
}
}
// Build the compression policy SQL
var policyQueryBuilder strings.Builder
policyQueryBuilder.WriteString(fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s'", targetTable, interval))
if segmentBy != "" {
policyQueryBuilder.WriteString(fmt.Sprintf(", segmentby => '%s'", segmentBy))
}
if orderBy != "" {
policyQueryBuilder.WriteString(fmt.Sprintf(", orderby => '%s'", orderBy))
}
policyQueryBuilder.WriteString(")")
// Execute the statement to add the compression policy
_, err = useCase.ExecuteStatement(ctx, dbID, policyQueryBuilder.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to add compression policy: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully added compression policy to hypertable '%s'", targetTable),
}, nil
}
// handleRemoveCompressionPolicy handles the remove_compression_policy operation
func (t *TimescaleDBTool) handleRemoveCompressionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Find the policy ID
policyQuery := fmt.Sprintf(
"SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_compression'",
targetTable,
)
policyResult, err := useCase.ExecuteStatement(ctx, dbID, policyQuery, nil)
if err != nil {
return nil, fmt.Errorf("failed to find compression policy: %w", err)
}
// Parse the result to get the job ID
var policies []map[string]interface{}
if err := json.Unmarshal([]byte(policyResult), &policies); err != nil {
return nil, fmt.Errorf("failed to parse policy info: %w", err)
}
if len(policies) == 0 {
return map[string]interface{}{
"message": fmt.Sprintf("No compression policy found for hypertable '%s'", targetTable),
}, nil
}
jobID := policies[0]["job_id"]
if jobID == nil {
return nil, fmt.Errorf("invalid job ID for compression policy")
}
// Remove the policy
removeSQL := fmt.Sprintf("SELECT remove_compression_policy(%v)", jobID)
_, err = useCase.ExecuteStatement(ctx, dbID, removeSQL, nil)
if err != nil {
return nil, fmt.Errorf("failed to remove compression policy: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully removed compression policy from hypertable '%s'", targetTable),
}, nil
}
// handleGetCompressionSettings handles the get_compression_settings operation
func (t *TimescaleDBTool) handleGetCompressionSettings(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Check if the table is a hypertable and has compression enabled
hypertableQuery := fmt.Sprintf(
"SELECT compress FROM timescaledb_information.hypertables WHERE hypertable_name = '%s'",
targetTable,
)
hypertableResult, err := useCase.ExecuteStatement(ctx, dbID, hypertableQuery, nil)
if err != nil {
return nil, fmt.Errorf("failed to check hypertable info: %w", err)
}
// Parse the result
var hypertables []map[string]interface{}
if err := json.Unmarshal([]byte(hypertableResult), &hypertables); err != nil {
return nil, fmt.Errorf("failed to parse hypertable info: %w", err)
}
if len(hypertables) == 0 {
return nil, fmt.Errorf("table '%s' is not a hypertable", targetTable)
}
// Create settings object
settings := map[string]interface{}{
"hypertable_name": targetTable,
"compression_enabled": false,
"segment_by": nil,
"order_by": nil,
"chunk_time_interval": nil,
"compression_interval": nil,
}
isCompressed := false
if compress, ok := hypertables[0]["compress"]; ok && compress != nil {
isCompressed = fmt.Sprintf("%v", compress) == "true"
}
settings["compression_enabled"] = isCompressed
if isCompressed {
// Get compression settings
compressionQuery := fmt.Sprintf(
"SELECT segmentby, orderby FROM timescaledb_information.compression_settings WHERE hypertable_name = '%s'",
targetTable,
)
compressionResult, err := useCase.ExecuteStatement(ctx, dbID, compressionQuery, nil)
if err != nil {
return nil, fmt.Errorf("failed to get compression settings: %w", err)
}
var compressionSettings []map[string]interface{}
if err := json.Unmarshal([]byte(compressionResult), &compressionSettings); err != nil {
return nil, fmt.Errorf("failed to parse compression settings: %w", err)
}
if len(compressionSettings) > 0 {
if segmentBy, ok := compressionSettings[0]["segmentby"]; ok && segmentBy != nil {
settings["segment_by"] = segmentBy
}
if orderBy, ok := compressionSettings[0]["orderby"]; ok && orderBy != nil {
settings["order_by"] = orderBy
}
}
// Get policy information
policyQuery := fmt.Sprintf(
"SELECT s.schedule_interval, h.chunk_time_interval FROM timescaledb_information.jobs j "+
"JOIN timescaledb_information.job_stats s ON j.job_id = s.job_id "+
"JOIN timescaledb_information.hypertables h ON j.hypertable_name = h.hypertable_name "+
"WHERE j.hypertable_name = '%s' AND j.proc_name = 'policy_compression'",
targetTable,
)
policyResult, err := useCase.ExecuteStatement(ctx, dbID, policyQuery, nil)
if err == nil {
var policyInfo []map[string]interface{}
if err := json.Unmarshal([]byte(policyResult), &policyInfo); err != nil {
return nil, fmt.Errorf("failed to parse policy info: %w", err)
}
if len(policyInfo) > 0 {
if interval, ok := policyInfo[0]["schedule_interval"]; ok && interval != nil {
settings["compression_interval"] = interval
}
if chunkInterval, ok := policyInfo[0]["chunk_time_interval"]; ok && chunkInterval != nil {
settings["chunk_time_interval"] = chunkInterval
}
}
}
}
return map[string]interface{}{
"message": fmt.Sprintf("Retrieved compression settings for hypertable '%s'", targetTable),
"settings": settings,
}, nil
}
// handleAddRetentionPolicy handles the add_retention_policy operation
func (t *TimescaleDBTool) handleAddRetentionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
retentionInterval, ok := request.Parameters["retention_interval"].(string)
if !ok || retentionInterval == "" {
return nil, fmt.Errorf("retention_interval parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL statement to add a retention policy
sql := fmt.Sprintf("SELECT add_retention_policy('%s', INTERVAL '%s')", targetTable, retentionInterval)
// Execute the statement
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to add retention policy: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully added retention policy to '%s' with interval '%s'", targetTable, retentionInterval),
"details": result,
}, nil
}
// handleRemoveRetentionPolicy handles the remove_retention_policy operation
func (t *TimescaleDBTool) handleRemoveRetentionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// First, find the policy job ID
findPolicySQL := fmt.Sprintf(
"SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_retention'",
targetTable,
)
// Execute the statement to find the policy
policyResult, err := useCase.ExecuteStatement(ctx, dbID, findPolicySQL, nil)
if err != nil {
return nil, fmt.Errorf("failed to find retention policy: %w", err)
}
// Check if we found a policy
if policyResult == "[]" || policyResult == "" {
return map[string]interface{}{
"message": fmt.Sprintf("No retention policy found for table '%s'", targetTable),
}, nil
}
// Now remove the policy - assuming we received a JSON array with the job_id
removeSQL := fmt.Sprintf(
"SELECT remove_retention_policy((SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_retention' LIMIT 1))",
targetTable,
)
// Execute the statement to remove the policy
result, err := useCase.ExecuteStatement(ctx, dbID, removeSQL, nil)
if err != nil {
return nil, fmt.Errorf("failed to remove retention policy: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully removed retention policy from '%s'", targetTable),
"details": result,
}, nil
}
// handleGetRetentionPolicy handles the get_retention_policy operation
func (t *TimescaleDBTool) handleGetRetentionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL query to get retention policy details
sql := fmt.Sprintf(`
SELECT
'%s' as hypertable_name,
js.schedule_interval as retention_interval,
CASE WHEN j.job_id IS NOT NULL THEN true ELSE false END as retention_enabled
FROM
timescaledb_information.jobs j
JOIN
timescaledb_information.job_stats js ON j.job_id = js.job_id
WHERE
j.hypertable_name = '%s' AND j.proc_name = 'policy_retention'
`, targetTable, targetTable)
// Execute the statement
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to get retention policy: %w", err)
}
// Check if we got any results
if result == "[]" || result == "" {
// No retention policy found, return a default structure
return map[string]interface{}{
"message": fmt.Sprintf("No retention policy found for table '%s'", targetTable),
"details": fmt.Sprintf(`[{"hypertable_name":"%s","retention_enabled":false}]`, targetTable),
}, nil
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully retrieved retention policy for '%s'", targetTable),
"details": result,
}, nil
}
// handleTimeSeriesQuery handles the time_series_query operation
func (t *TimescaleDBTool) handleTimeSeriesQuery(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
timeColumn, ok := request.Parameters["time_column"].(string)
if !ok || timeColumn == "" {
return nil, fmt.Errorf("time_column parameter is required")
}
bucketInterval, ok := request.Parameters["bucket_interval"].(string)
if !ok || bucketInterval == "" {
return nil, fmt.Errorf("bucket_interval parameter is required")
}
// Extract optional parameters
startTimeStr := getStringParam(request.Parameters, "start_time")
endTimeStr := getStringParam(request.Parameters, "end_time")
aggregations := getStringParam(request.Parameters, "aggregations")
whereCondition := getStringParam(request.Parameters, "where_condition")
groupBy := getStringParam(request.Parameters, "group_by")
orderBy := getStringParam(request.Parameters, "order_by")
windowFunctions := getStringParam(request.Parameters, "window_functions")
limitStr := getStringParam(request.Parameters, "limit")
formatPretty := getBoolParam(request.Parameters, "format_pretty")
// Set default values for optional parameters
if aggregations == "" {
aggregations = "count(*) as count"
}
// Build WHERE clause
whereClause := ""
if startTimeStr != "" && endTimeStr != "" {
whereClause = fmt.Sprintf("%s BETWEEN '%s' AND '%s'", timeColumn, startTimeStr, endTimeStr)
if whereCondition != "" {
whereClause = fmt.Sprintf("%s AND %s", whereClause, whereCondition)
}
} else if whereCondition != "" {
whereClause = whereCondition
} else {
whereClause = "1=1" // Always true if no conditions
}
// Set default group by if not provided
if groupBy == "" {
groupBy = "time_bucket"
} else {
groupBy = fmt.Sprintf("time_bucket, %s", groupBy)
}
// Set default order by if not provided
if orderBy == "" {
orderBy = "time_bucket"
}
// Set default limit if not provided
limit := 1000 // Default limit
if limitStr != "" {
if parsedLimit, err := strconv.Atoi(limitStr); err == nil && parsedLimit > 0 {
limit = parsedLimit
}
}
// Build the base SQL query
var sql string
if windowFunctions == "" {
// Simple query without window functions
sql = fmt.Sprintf(`
SELECT
time_bucket('%s', %s) as time_bucket,
%s
FROM
%s
WHERE
%s
GROUP BY
%s
ORDER BY
%s
LIMIT %d
`, bucketInterval, timeColumn, aggregations, targetTable, whereClause, groupBy, orderBy, limit)
} else {
// Query with window functions - need to use a subquery
sql = fmt.Sprintf(`
SELECT
time_bucket,
%s,
%s
FROM (
SELECT
time_bucket('%s', %s) as time_bucket,
%s
FROM
%s
WHERE
%s
GROUP BY
%s
ORDER BY
%s
) AS sub
ORDER BY
%s
LIMIT %d
`, aggregations, windowFunctions, bucketInterval, timeColumn, aggregations, targetTable, whereClause, groupBy, orderBy, orderBy, limit)
}
// Execute the query
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to execute time-series query: %w", err)
}
// Generate the response
response := map[string]interface{}{
"message": "Successfully retrieved time-series data",
"details": result,
}
// Add metadata if pretty format is requested
if formatPretty {
// Try to parse the result JSON for better presentation
var resultData []map[string]interface{}
if err := json.Unmarshal([]byte(result), &resultData); err == nil {
// Add statistics about the data
numRows := len(resultData)
response = addMetadata(response, "num_rows", numRows)
response = addMetadata(response, "time_bucket_interval", bucketInterval)
if numRows > 0 {
// Extract time range from the data if available
if firstBucket, ok := resultData[0]["time_bucket"].(string); ok {
response = addMetadata(response, "first_bucket", firstBucket)
}
if lastBucket, ok := resultData[numRows-1]["time_bucket"].(string); ok {
response = addMetadata(response, "last_bucket", lastBucket)
}
}
}
}
return response, nil
}
// handleTimeSeriesAnalyze handles the analyze_time_series operation
func (t *TimescaleDBTool) handleTimeSeriesAnalyze(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
targetTable, ok := request.Parameters["target_table"].(string)
if !ok || targetTable == "" {
return nil, fmt.Errorf("target_table parameter is required")
}
timeColumn, ok := request.Parameters["time_column"].(string)
if !ok || timeColumn == "" {
return nil, fmt.Errorf("time_column parameter is required")
}
// Extract optional parameters
startTimeStr := getStringParam(request.Parameters, "start_time")
endTimeStr := getStringParam(request.Parameters, "end_time")
// Build WHERE clause
whereClause := ""
if startTimeStr != "" && endTimeStr != "" {
whereClause = fmt.Sprintf("WHERE %s BETWEEN '%s' AND '%s'", timeColumn, startTimeStr, endTimeStr)
}
// Build the SQL query for basic time series analysis
sql := fmt.Sprintf(`
SELECT
COUNT(*) as row_count,
MIN(%s) as min_time,
MAX(%s) as max_time,
(MAX(%s) - MIN(%s)) as time_span,
COUNT(DISTINCT date_trunc('day', %s)) as unique_days
FROM
%s
%s
`, timeColumn, timeColumn, timeColumn, timeColumn, timeColumn, targetTable, whereClause)
// Execute the query
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to analyze time-series data: %w", err)
}
return map[string]interface{}{
"message": "Successfully analyzed time-series data",
"details": result,
}, nil
}
// handleCreateContinuousAggregate handles the create_continuous_aggregate operation
func (t *TimescaleDBTool) handleCreateContinuousAggregate(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
viewName, ok := request.Parameters["view_name"].(string)
if !ok || viewName == "" {
return nil, fmt.Errorf("view_name parameter is required")
}
sourceTable, ok := request.Parameters["source_table"].(string)
if !ok || sourceTable == "" {
return nil, fmt.Errorf("source_table parameter is required")
}
timeColumn, ok := request.Parameters["time_column"].(string)
if !ok || timeColumn == "" {
return nil, fmt.Errorf("time_column parameter is required")
}
bucketInterval, ok := request.Parameters["bucket_interval"].(string)
if !ok || bucketInterval == "" {
return nil, fmt.Errorf("bucket_interval parameter is required")
}
// Extract optional parameters
aggregationsStr := getStringParam(request.Parameters, "aggregations")
whereCondition := getStringParam(request.Parameters, "where_condition")
withData := getBoolParam(request.Parameters, "with_data")
refreshPolicy := getBoolParam(request.Parameters, "refresh_policy")
refreshInterval := getStringParam(request.Parameters, "refresh_interval")
// Parse aggregations from comma-separated string
var aggregationsParts []string
if aggregationsStr != "" {
aggregationsParts = strings.Split(aggregationsStr, ",")
} else {
// Default aggregation if none specified
aggregationsParts = []string{"COUNT(*) AS count"}
}
// Build the SQL statement to create a continuous aggregate
var builder strings.Builder
builder.WriteString("CREATE MATERIALIZED VIEW ")
builder.WriteString(viewName)
builder.WriteString("\nAS SELECT\n time_bucket('")
builder.WriteString(bucketInterval)
builder.WriteString("', ")
builder.WriteString(timeColumn)
builder.WriteString(") AS time_bucket")
// Add aggregations
for _, agg := range aggregationsParts {
builder.WriteString(",\n ")
builder.WriteString(strings.TrimSpace(agg))
}
// Add FROM clause
builder.WriteString("\nFROM ")
builder.WriteString(sourceTable)
// Add WHERE clause if specified
if whereCondition != "" {
builder.WriteString("\nWHERE ")
builder.WriteString(whereCondition)
}
// Add GROUP BY clause
builder.WriteString("\nGROUP BY time_bucket")
// Add WITH DATA or WITH NO DATA
if withData {
builder.WriteString("\nWITH DATA")
} else {
builder.WriteString("\nWITH NO DATA")
}
// Execute the statement
_, err := useCase.ExecuteStatement(ctx, dbID, builder.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to create continuous aggregate: %w", err)
}
// Add refresh policy if requested
if refreshPolicy && refreshInterval != "" {
policySQL := fmt.Sprintf("SELECT add_continuous_aggregate_policy('%s', start_offset => INTERVAL '1 week', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '%s')", viewName, refreshInterval)
_, err := useCase.ExecuteStatement(ctx, dbID, policySQL, nil)
if err != nil {
return map[string]interface{}{
"message": fmt.Sprintf("Created continuous aggregate '%s' but failed to add refresh policy: %s", viewName, err.Error()),
}, nil
}
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully created continuous aggregate '%s'", viewName),
"sql": builder.String(),
}, nil
}
// handleRefreshContinuousAggregate handles the refresh_continuous_aggregate operation
func (t *TimescaleDBTool) handleRefreshContinuousAggregate(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
viewName, ok := request.Parameters["view_name"].(string)
if !ok || viewName == "" {
return nil, fmt.Errorf("view_name parameter is required")
}
// Extract optional parameters
startTimeStr := getStringParam(request.Parameters, "start_time")
endTimeStr := getStringParam(request.Parameters, "end_time")
// Build the SQL statement to refresh a continuous aggregate
var sql string
if startTimeStr != "" && endTimeStr != "" {
sql = fmt.Sprintf("CALL refresh_continuous_aggregate('%s', '%s', '%s')",
viewName, startTimeStr, endTimeStr)
} else {
sql = fmt.Sprintf("CALL refresh_continuous_aggregate('%s', NULL, NULL)", viewName)
}
// Execute the statement
_, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to refresh continuous aggregate: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully refreshed continuous aggregate '%s'", viewName),
}, nil
}
// handleDropContinuousAggregate handles the drop_continuous_aggregate operation
func (t *TimescaleDBTool) handleDropContinuousAggregate(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
viewName, ok := request.Parameters["view_name"].(string)
if !ok || viewName == "" {
return nil, fmt.Errorf("view_name parameter is required")
}
// Extract optional parameters
cascade := getBoolParam(request.Parameters, "cascade")
// Build the SQL statement to drop a continuous aggregate
sql := fmt.Sprintf("DROP MATERIALIZED VIEW %s", viewName)
if cascade {
sql += " CASCADE"
}
// Execute the statement
_, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to drop continuous aggregate: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully dropped continuous aggregate '%s'", viewName),
}, nil
}
// handleListContinuousAggregates handles the list_continuous_aggregates operation
func (t *TimescaleDBTool) handleListContinuousAggregates(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL query to list continuous aggregates
sql := `
SELECT view_name, source_table, time_column, bucket_interval, aggregations, where_condition, with_data, refresh_policy, refresh_interval
FROM timescaledb_information.continuous_aggregates
`
// Execute the statement
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to list continuous aggregates: %w", err)
}
return map[string]interface{}{
"message": "Successfully retrieved continuous aggregates list",
"details": result,
}, nil
}
// handleGetContinuousAggregateInfo handles the get_continuous_aggregate_info operation
func (t *TimescaleDBTool) handleGetContinuousAggregateInfo(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
viewName, ok := request.Parameters["view_name"].(string)
if !ok || viewName == "" {
return nil, fmt.Errorf("view_name parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL query to get continuous aggregate information
sql := fmt.Sprintf(`
SELECT
view_name,
source_table,
time_column,
bucket_interval,
aggregations,
where_condition,
with_data,
refresh_policy,
refresh_interval
FROM
timescaledb_information.continuous_aggregates
WHERE
view_name = '%s'
`, viewName)
// Execute the statement
result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to get continuous aggregate info: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully retrieved continuous aggregate information for '%s'", viewName),
"details": result,
}, nil
}
// handleAddContinuousAggregatePolicy handles the add_continuous_aggregate_policy operation
func (t *TimescaleDBTool) handleAddContinuousAggregatePolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
viewName, ok := request.Parameters["view_name"].(string)
if !ok || viewName == "" {
return nil, fmt.Errorf("view_name parameter is required")
}
startOffset, ok := request.Parameters["start_offset"].(string)
if !ok || startOffset == "" {
return nil, fmt.Errorf("start_offset parameter is required")
}
endOffset, ok := request.Parameters["end_offset"].(string)
if !ok || endOffset == "" {
return nil, fmt.Errorf("end_offset parameter is required")
}
scheduleInterval, ok := request.Parameters["schedule_interval"].(string)
if !ok || scheduleInterval == "" {
return nil, fmt.Errorf("schedule_interval parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL statement to add a continuous aggregate policy
sql := fmt.Sprintf("SELECT add_continuous_aggregate_policy('%s', start_offset => INTERVAL '%s', end_offset => INTERVAL '%s', schedule_interval => INTERVAL '%s')",
viewName, startOffset, endOffset, scheduleInterval)
// Execute the statement
_, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to add continuous aggregate policy: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully added continuous aggregate policy to '%s'", viewName),
}, nil
}
// handleRemoveContinuousAggregatePolicy handles the remove_continuous_aggregate_policy operation
func (t *TimescaleDBTool) handleRemoveContinuousAggregatePolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
// Extract required parameters
viewName, ok := request.Parameters["view_name"].(string)
if !ok || viewName == "" {
return nil, fmt.Errorf("view_name parameter is required")
}
// Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
dbType, err := useCase.GetDatabaseType(dbID)
if err != nil {
return nil, fmt.Errorf("failed to get database type: %w", err)
}
if !strings.Contains(strings.ToLower(dbType), "postgres") {
return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
}
// Build the SQL statement to remove a continuous aggregate policy
sql := fmt.Sprintf("SELECT remove_continuous_aggregate_policy('%s')", viewName)
// Execute the statement
_, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
if err != nil {
return nil, fmt.Errorf("failed to remove continuous aggregate policy: %w", err)
}
return map[string]interface{}{
"message": fmt.Sprintf("Successfully removed continuous aggregate policy from '%s'", viewName),
}, nil
}
// getStringParam safely extracts a string parameter from a parameter map
func getStringParam(params map[string]interface{}, key string) string {
if value, ok := params[key].(string); ok {
return value
}
return ""
}
// getBoolParam safely extracts a boolean parameter from a parameter map
func getBoolParam(params map[string]interface{}, key string) bool {
if value, ok := params[key].(bool); ok {
return value
}
return false
}
// buildCreateHypertableSQL constructs the SQL statement to create a hypertable
func buildCreateHypertableSQL(table, timeColumn, chunkTimeInterval, partitioningColumn string, ifNotExists bool) string {
var args []string
// Add required arguments: table name and time column
args = append(args, fmt.Sprintf("'%s'", table))
args = append(args, fmt.Sprintf("'%s'", timeColumn))
// Build optional parameters
var options []string
if chunkTimeInterval != "" {
options = append(options, fmt.Sprintf("chunk_time_interval => interval '%s'", chunkTimeInterval))
}
if partitioningColumn != "" {
options = append(options, fmt.Sprintf("partitioning_column => '%s'", partitioningColumn))
}
options = append(options, fmt.Sprintf("if_not_exists => %t", ifNotExists))
// Construct the full SQL statement
sql := fmt.Sprintf("SELECT create_hypertable(%s", strings.Join(args, ", "))
if len(options) > 0 {
sql += ", " + strings.Join(options, ", ")
}
sql += ")"
return sql
}
// RegisterTimescaleDBTools registers TimescaleDB tools
func RegisterTimescaleDBTools(registry interface{}) error {
// Cast the registry to the expected type
toolRegistry, ok := registry.(*ToolTypeFactory)
if !ok {
return fmt.Errorf("invalid registry type")
}
// Create the TimescaleDB tool
tool := NewTimescaleDBTool()
// Register it with the factory
toolRegistry.Register(tool)
return nil
}
```