This is page 6 of 7. Use http://codebase.md/freepeak/db-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cm
│ └── gitstream.cm
├── .cursor
│ ├── mcp-example.json
│ ├── mcp.json
│ └── rules
│ └── global.mdc
├── .dockerignore
├── .DS_Store
├── .env.example
├── .github
│ ├── FUNDING.yml
│ └── workflows
│ └── go.yml
├── .gitignore
├── .golangci.yml
├── assets
│ └── logo.svg
├── CHANGELOG.md
├── cmd
│ └── server
│ └── main.go
├── commit-message.txt
├── config.json
├── config.timescaledb-test.json
├── docker-compose.mcp-test.yml
├── docker-compose.test.yml
├── docker-compose.timescaledb-test.yml
├── docker-compose.yml
├── docker-wrapper.sh
├── Dockerfile
├── docs
│ ├── REFACTORING.md
│ ├── TIMESCALEDB_FUNCTIONS.md
│ ├── TIMESCALEDB_IMPLEMENTATION.md
│ ├── TIMESCALEDB_PRD.md
│ └── TIMESCALEDB_TOOLS.md
├── examples
│ └── postgres_connection.go
├── glama.json
├── go.mod
├── go.sum
├── init-scripts
│ └── timescaledb
│ ├── 01-init.sql
│ ├── 02-sample-data.sql
│ ├── 03-continuous-aggregates.sql
│ └── README.md
├── internal
│ ├── config
│ │ ├── config_test.go
│ │ └── config.go
│ ├── delivery
│ │ └── mcp
│ │ ├── compression_policy_test.go
│ │ ├── context
│ │ │ ├── hypertable_schema_test.go
│ │ │ ├── timescale_completion_test.go
│ │ │ ├── timescale_context_test.go
│ │ │ └── timescale_query_suggestion_test.go
│ │ ├── mock_test.go
│ │ ├── response_test.go
│ │ ├── response.go
│ │ ├── retention_policy_test.go
│ │ ├── server_wrapper.go
│ │ ├── timescale_completion.go
│ │ ├── timescale_context.go
│ │ ├── timescale_schema.go
│ │ ├── timescale_tool_test.go
│ │ ├── timescale_tool.go
│ │ ├── timescale_tools_test.go
│ │ ├── tool_registry.go
│ │ └── tool_types.go
│ ├── domain
│ │ └── database.go
│ ├── logger
│ │ ├── logger_test.go
│ │ └── logger.go
│ ├── repository
│ │ └── database_repository.go
│ └── usecase
│ └── database_usecase.go
├── LICENSE
├── Makefile
├── pkg
│ ├── core
│ │ ├── core.go
│ │ └── logging.go
│ ├── db
│ │ ├── db_test.go
│ │ ├── db.go
│ │ ├── manager.go
│ │ ├── README.md
│ │ └── timescale
│ │ ├── config_test.go
│ │ ├── config.go
│ │ ├── connection_test.go
│ │ ├── connection.go
│ │ ├── continuous_aggregate_test.go
│ │ ├── continuous_aggregate.go
│ │ ├── hypertable_test.go
│ │ ├── hypertable.go
│ │ ├── metadata.go
│ │ ├── mocks_test.go
│ │ ├── policy_test.go
│ │ ├── policy.go
│ │ ├── query.go
│ │ ├── timeseries_test.go
│ │ └── timeseries.go
│ ├── dbtools
│ │ ├── db_helpers.go
│ │ ├── dbtools_test.go
│ │ ├── dbtools.go
│ │ ├── exec.go
│ │ ├── performance_test.go
│ │ ├── performance.go
│ │ ├── query.go
│ │ ├── querybuilder_test.go
│ │ ├── querybuilder.go
│ │ ├── README.md
│ │ ├── schema_test.go
│ │ ├── schema.go
│ │ ├── tx_test.go
│ │ └── tx.go
│ ├── internal
│ │ └── logger
│ │ └── logger.go
│ ├── jsonrpc
│ │ └── jsonrpc.go
│ ├── logger
│ │ └── logger.go
│ └── tools
│ └── tools.go
├── README-old.md
├── README.md
├── repomix-output.txt
├── request.json
├── start-mcp.sh
├── test.Dockerfile
├── timescaledb-test.sh
└── wait-for-it.sh
```
# Files
--------------------------------------------------------------------------------
/internal/delivery/mcp/timescale_tool.go:
--------------------------------------------------------------------------------
```go
1 | package mcp
2 |
3 | import (
4 | "context"
5 | "encoding/json"
6 | "fmt"
7 | "strconv"
8 | "strings"
9 |
10 | "github.com/FreePeak/cortex/pkg/server"
11 | cortextools "github.com/FreePeak/cortex/pkg/tools"
12 | )
13 |
14 | // TimescaleDBTool implements a tool for TimescaleDB operations
15 | type TimescaleDBTool struct {
16 | name string
17 | description string
18 | }
19 |
20 | // NewTimescaleDBTool creates a new TimescaleDB tool
21 | func NewTimescaleDBTool() *TimescaleDBTool {
22 | return &TimescaleDBTool{
23 | name: "timescaledb",
24 | description: "Perform TimescaleDB operations",
25 | }
26 | }
27 |
28 | // GetName returns the name of the tool
29 | func (t *TimescaleDBTool) GetName() string {
30 | return t.name
31 | }
32 |
33 | // GetDescription returns the description of the tool
34 | func (t *TimescaleDBTool) GetDescription(dbID string) string {
35 | if dbID == "" {
36 | return t.description
37 | }
38 | return fmt.Sprintf("%s on %s", t.description, dbID)
39 | }
40 |
41 | // CreateTool creates the TimescaleDB tool
42 | func (t *TimescaleDBTool) CreateTool(name string, dbID string) interface{} {
43 | // Create main tool that describes the available operations
44 | mainTool := cortextools.NewTool(
45 | name,
46 | cortextools.WithDescription(t.GetDescription(dbID)),
47 | cortextools.WithString("operation",
48 | cortextools.Description("TimescaleDB operation to perform"),
49 | cortextools.Required(),
50 | ),
51 | cortextools.WithString("target_table",
52 | cortextools.Description("The table to perform the operation on"),
53 | ),
54 | )
55 |
56 | return mainTool
57 | }
58 |
59 | // CreateHypertableTool creates a specific tool for hypertable creation
60 | func (t *TimescaleDBTool) CreateHypertableTool(name string, dbID string) interface{} {
61 | return cortextools.NewTool(
62 | name,
63 | cortextools.WithDescription(fmt.Sprintf("Create TimescaleDB hypertable on %s", dbID)),
64 | cortextools.WithString("operation",
65 | cortextools.Description("The operation must be 'create_hypertable'"),
66 | cortextools.Required(),
67 | ),
68 | cortextools.WithString("target_table",
69 | cortextools.Description("The table to convert to a hypertable"),
70 | cortextools.Required(),
71 | ),
72 | cortextools.WithString("time_column",
73 | cortextools.Description("The timestamp column for the hypertable"),
74 | cortextools.Required(),
75 | ),
76 | cortextools.WithString("chunk_time_interval",
77 | cortextools.Description("Time interval for chunks (e.g., '1 day')"),
78 | ),
79 | cortextools.WithString("partitioning_column",
80 | cortextools.Description("Optional column for space partitioning"),
81 | ),
82 | cortextools.WithBoolean("if_not_exists",
83 | cortextools.Description("Skip if hypertable already exists"),
84 | ),
85 | )
86 | }
87 |
88 | // CreateListHypertablesTool creates a specific tool for listing hypertables
89 | func (t *TimescaleDBTool) CreateListHypertablesTool(name string, dbID string) interface{} {
90 | return cortextools.NewTool(
91 | name,
92 | cortextools.WithDescription(fmt.Sprintf("List TimescaleDB hypertables on %s", dbID)),
93 | cortextools.WithString("operation",
94 | cortextools.Description("The operation must be 'list_hypertables'"),
95 | cortextools.Required(),
96 | ),
97 | )
98 | }
99 |
100 | // CreateCompressionEnableTool creates a tool for enabling compression on a hypertable
101 | func (t *TimescaleDBTool) CreateCompressionEnableTool(name string, dbID string) interface{} {
102 | return cortextools.NewTool(
103 | name,
104 | cortextools.WithDescription(fmt.Sprintf("Enable compression on TimescaleDB hypertable on %s", dbID)),
105 | cortextools.WithString("operation",
106 | cortextools.Description("The operation must be 'enable_compression'"),
107 | cortextools.Required(),
108 | ),
109 | cortextools.WithString("target_table",
110 | cortextools.Description("The hypertable to enable compression on"),
111 | cortextools.Required(),
112 | ),
113 | cortextools.WithString("after",
114 | cortextools.Description("Time interval after which to compress chunks (e.g., '7 days')"),
115 | ),
116 | )
117 | }
118 |
119 | // CreateCompressionDisableTool creates a tool for disabling compression on a hypertable
120 | func (t *TimescaleDBTool) CreateCompressionDisableTool(name string, dbID string) interface{} {
121 | return cortextools.NewTool(
122 | name,
123 | cortextools.WithDescription(fmt.Sprintf("Disable compression on TimescaleDB hypertable on %s", dbID)),
124 | cortextools.WithString("operation",
125 | cortextools.Description("The operation must be 'disable_compression'"),
126 | cortextools.Required(),
127 | ),
128 | cortextools.WithString("target_table",
129 | cortextools.Description("The hypertable to disable compression on"),
130 | cortextools.Required(),
131 | ),
132 | )
133 | }
134 |
135 | // CreateCompressionPolicyAddTool creates a tool for adding a compression policy
136 | func (t *TimescaleDBTool) CreateCompressionPolicyAddTool(name string, dbID string) interface{} {
137 | return cortextools.NewTool(
138 | name,
139 | cortextools.WithDescription(fmt.Sprintf("Add compression policy to TimescaleDB hypertable on %s", dbID)),
140 | cortextools.WithString("operation",
141 | cortextools.Description("The operation must be 'add_compression_policy'"),
142 | cortextools.Required(),
143 | ),
144 | cortextools.WithString("target_table",
145 | cortextools.Description("The hypertable to add compression policy to"),
146 | cortextools.Required(),
147 | ),
148 | cortextools.WithString("interval",
149 | cortextools.Description("Time interval after which to compress chunks (e.g., '30 days')"),
150 | cortextools.Required(),
151 | ),
152 | cortextools.WithString("segment_by",
153 | cortextools.Description("Column to use for segmenting data during compression"),
154 | ),
155 | cortextools.WithString("order_by",
156 | cortextools.Description("Column(s) to use for ordering data during compression"),
157 | ),
158 | )
159 | }
160 |
161 | // CreateCompressionPolicyRemoveTool creates a tool for removing a compression policy
162 | func (t *TimescaleDBTool) CreateCompressionPolicyRemoveTool(name string, dbID string) interface{} {
163 | return cortextools.NewTool(
164 | name,
165 | cortextools.WithDescription(fmt.Sprintf("Remove compression policy from TimescaleDB hypertable on %s", dbID)),
166 | cortextools.WithString("operation",
167 | cortextools.Description("The operation must be 'remove_compression_policy'"),
168 | cortextools.Required(),
169 | ),
170 | cortextools.WithString("target_table",
171 | cortextools.Description("The hypertable to remove compression policy from"),
172 | cortextools.Required(),
173 | ),
174 | )
175 | }
176 |
177 | // CreateCompressionSettingsTool creates a tool for retrieving compression settings
178 | func (t *TimescaleDBTool) CreateCompressionSettingsTool(name string, dbID string) interface{} {
179 | return cortextools.NewTool(
180 | name,
181 | cortextools.WithDescription(fmt.Sprintf("Get compression settings for TimescaleDB hypertable on %s", dbID)),
182 | cortextools.WithString("operation",
183 | cortextools.Description("The operation must be 'get_compression_settings'"),
184 | cortextools.Required(),
185 | ),
186 | cortextools.WithString("target_table",
187 | cortextools.Description("The hypertable to get compression settings for"),
188 | cortextools.Required(),
189 | ),
190 | )
191 | }
192 |
193 | // CreateRetentionPolicyTool creates a specific tool for managing retention policies
194 | func (t *TimescaleDBTool) CreateRetentionPolicyTool(name string, dbID string) interface{} {
195 | return cortextools.NewTool(
196 | name,
197 | cortextools.WithDescription(fmt.Sprintf("Manage TimescaleDB retention policies on %s", dbID)),
198 | cortextools.WithString("operation",
199 | cortextools.Description("The operation must be one of: add_retention_policy, remove_retention_policy, get_retention_policy"),
200 | cortextools.Required(),
201 | ),
202 | cortextools.WithString("target_table",
203 | cortextools.Description("The hypertable to manage retention policy for"),
204 | cortextools.Required(),
205 | ),
206 | cortextools.WithString("retention_interval",
207 | cortextools.Description("Time interval for data retention (e.g., '30 days', '6 months')"),
208 | ),
209 | )
210 | }
211 |
212 | // CreateTimeSeriesQueryTool creates a specific tool for time-series queries
213 | func (t *TimescaleDBTool) CreateTimeSeriesQueryTool(name string, dbID string) interface{} {
214 | return cortextools.NewTool(
215 | name,
216 | cortextools.WithDescription(fmt.Sprintf("Execute time-series queries on TimescaleDB %s", dbID)),
217 | cortextools.WithString("operation",
218 | cortextools.Description("The operation must be 'time_series_query'"),
219 | cortextools.Required(),
220 | ),
221 | cortextools.WithString("target_table",
222 | cortextools.Description("The table to query"),
223 | cortextools.Required(),
224 | ),
225 | cortextools.WithString("time_column",
226 | cortextools.Description("The timestamp column for time bucketing"),
227 | cortextools.Required(),
228 | ),
229 | cortextools.WithString("bucket_interval",
230 | cortextools.Description("Time bucket interval (e.g., '1 hour', '1 day')"),
231 | cortextools.Required(),
232 | ),
233 | cortextools.WithString("start_time",
234 | cortextools.Description("Start of time range (e.g., '2023-01-01')"),
235 | ),
236 | cortextools.WithString("end_time",
237 | cortextools.Description("End of time range (e.g., '2023-01-31')"),
238 | ),
239 | cortextools.WithString("aggregations",
240 | cortextools.Description("Comma-separated list of aggregations (e.g., 'AVG(temp),MAX(temp),COUNT(*)')"),
241 | ),
242 | cortextools.WithString("where_condition",
243 | cortextools.Description("Additional WHERE conditions"),
244 | ),
245 | cortextools.WithString("group_by",
246 | cortextools.Description("Additional GROUP BY columns (comma-separated)"),
247 | ),
248 | cortextools.WithString("order_by",
249 | cortextools.Description("Order by clause (default: time_bucket)"),
250 | ),
251 | cortextools.WithString("window_functions",
252 | cortextools.Description("Window functions to include (e.g. 'LAG(value) OVER (ORDER BY time_bucket) AS prev_value')"),
253 | ),
254 | cortextools.WithString("limit",
255 | cortextools.Description("Maximum number of rows to return"),
256 | ),
257 | cortextools.WithBoolean("format_pretty",
258 | cortextools.Description("Whether to format the response in a more readable way"),
259 | ),
260 | )
261 | }
262 |
263 | // CreateTimeSeriesAnalyzeTool creates a specific tool for analyzing time-series data
264 | func (t *TimescaleDBTool) CreateTimeSeriesAnalyzeTool(name string, dbID string) interface{} {
265 | return cortextools.NewTool(
266 | name,
267 | cortextools.WithDescription(fmt.Sprintf("Analyze time-series data patterns on TimescaleDB %s", dbID)),
268 | cortextools.WithString("operation",
269 | cortextools.Description("The operation must be 'analyze_time_series'"),
270 | cortextools.Required(),
271 | ),
272 | cortextools.WithString("target_table",
273 | cortextools.Description("The table to analyze"),
274 | cortextools.Required(),
275 | ),
276 | cortextools.WithString("time_column",
277 | cortextools.Description("The timestamp column"),
278 | cortextools.Required(),
279 | ),
280 | cortextools.WithString("start_time",
281 | cortextools.Description("Start of time range (e.g., '2023-01-01')"),
282 | ),
283 | cortextools.WithString("end_time",
284 | cortextools.Description("End of time range (e.g., '2023-01-31')"),
285 | ),
286 | )
287 | }
288 |
289 | // CreateContinuousAggregateTool creates a specific tool for creating continuous aggregates
290 | func (t *TimescaleDBTool) CreateContinuousAggregateTool(name string, dbID string) interface{} {
291 | return cortextools.NewTool(
292 | name,
293 | cortextools.WithDescription(fmt.Sprintf("Create TimescaleDB continuous aggregate on %s", dbID)),
294 | cortextools.WithString("operation",
295 | cortextools.Description("The operation must be 'create_continuous_aggregate'"),
296 | cortextools.Required(),
297 | ),
298 | cortextools.WithString("view_name",
299 | cortextools.Description("Name for the continuous aggregate view"),
300 | cortextools.Required(),
301 | ),
302 | cortextools.WithString("source_table",
303 | cortextools.Description("Source table with raw data"),
304 | cortextools.Required(),
305 | ),
306 | cortextools.WithString("time_column",
307 | cortextools.Description("Time column to bucket"),
308 | cortextools.Required(),
309 | ),
310 | cortextools.WithString("bucket_interval",
311 | cortextools.Description("Time bucket interval (e.g., '1 hour', '1 day')"),
312 | cortextools.Required(),
313 | ),
314 | cortextools.WithString("aggregations",
315 | cortextools.Description("Comma-separated list of aggregations (e.g., 'AVG(temp),MAX(temp),COUNT(*)')"),
316 | ),
317 | cortextools.WithString("where_condition",
318 | cortextools.Description("WHERE condition to filter source data"),
319 | ),
320 | cortextools.WithBoolean("with_data",
321 | cortextools.Description("Whether to materialize data immediately"),
322 | ),
323 | cortextools.WithBoolean("refresh_policy",
324 | cortextools.Description("Whether to add a refresh policy"),
325 | ),
326 | cortextools.WithString("refresh_interval",
327 | cortextools.Description("Refresh interval (e.g., '1 day')"),
328 | ),
329 | )
330 | }
331 |
332 | // CreateContinuousAggregateRefreshTool creates a specific tool for refreshing continuous aggregates
333 | func (t *TimescaleDBTool) CreateContinuousAggregateRefreshTool(name string, dbID string) interface{} {
334 | return cortextools.NewTool(
335 | name,
336 | cortextools.WithDescription(fmt.Sprintf("Refresh TimescaleDB continuous aggregate on %s", dbID)),
337 | cortextools.WithString("operation",
338 | cortextools.Description("The operation must be 'refresh_continuous_aggregate'"),
339 | cortextools.Required(),
340 | ),
341 | cortextools.WithString("view_name",
342 | cortextools.Description("Name of the continuous aggregate view"),
343 | cortextools.Required(),
344 | ),
345 | cortextools.WithString("start_time",
346 | cortextools.Description("Start of time range to refresh (e.g., '2023-01-01')"),
347 | ),
348 | cortextools.WithString("end_time",
349 | cortextools.Description("End of time range to refresh (e.g., '2023-01-31')"),
350 | ),
351 | )
352 | }
353 |
354 | // CreateContinuousAggregateDropTool creates a specific tool for dropping continuous aggregates
355 | func (t *TimescaleDBTool) CreateContinuousAggregateDropTool(name string, dbID string) interface{} {
356 | return cortextools.NewTool(
357 | name,
358 | cortextools.WithDescription(fmt.Sprintf("Drop TimescaleDB continuous aggregate on %s", dbID)),
359 | cortextools.WithString("operation",
360 | cortextools.Description("The operation must be 'drop_continuous_aggregate'"),
361 | cortextools.Required(),
362 | ),
363 | cortextools.WithString("view_name",
364 | cortextools.Description("Name of the continuous aggregate view to drop"),
365 | cortextools.Required(),
366 | ),
367 | cortextools.WithBoolean("cascade",
368 | cortextools.Description("Whether to drop dependent objects as well"),
369 | ),
370 | )
371 | }
372 |
373 | // CreateContinuousAggregateListTool creates a specific tool for listing continuous aggregates
374 | func (t *TimescaleDBTool) CreateContinuousAggregateListTool(name string, dbID string) interface{} {
375 | return cortextools.NewTool(
376 | name,
377 | cortextools.WithDescription(fmt.Sprintf("List TimescaleDB continuous aggregates on %s", dbID)),
378 | cortextools.WithString("operation",
379 | cortextools.Description("The operation must be 'list_continuous_aggregates'"),
380 | cortextools.Required(),
381 | ),
382 | )
383 | }
384 |
385 | // CreateContinuousAggregateInfoTool creates a specific tool for getting continuous aggregate information
386 | func (t *TimescaleDBTool) CreateContinuousAggregateInfoTool(name string, dbID string) interface{} {
387 | return cortextools.NewTool(
388 | name,
389 | cortextools.WithDescription(fmt.Sprintf("Get information about a TimescaleDB continuous aggregate on %s", dbID)),
390 | cortextools.WithString("operation",
391 | cortextools.Description("The operation must be 'get_continuous_aggregate_info'"),
392 | cortextools.Required(),
393 | ),
394 | cortextools.WithString("view_name",
395 | cortextools.Description("Name of the continuous aggregate view"),
396 | cortextools.Required(),
397 | ),
398 | )
399 | }
400 |
401 | // CreateContinuousAggregatePolicyAddTool creates a specific tool for adding a refresh policy
402 | func (t *TimescaleDBTool) CreateContinuousAggregatePolicyAddTool(name string, dbID string) interface{} {
403 | return cortextools.NewTool(
404 | name,
405 | cortextools.WithDescription(fmt.Sprintf("Add refresh policy to TimescaleDB continuous aggregate on %s", dbID)),
406 | cortextools.WithString("operation",
407 | cortextools.Description("The operation must be 'add_continuous_aggregate_policy'"),
408 | cortextools.Required(),
409 | ),
410 | cortextools.WithString("view_name",
411 | cortextools.Description("Name of the continuous aggregate view"),
412 | cortextools.Required(),
413 | ),
414 | cortextools.WithString("start_offset",
415 | cortextools.Description("How far to look back for data to refresh (e.g., '1 week')"),
416 | cortextools.Required(),
417 | ),
418 | cortextools.WithString("end_offset",
419 | cortextools.Description("How recent of data to refresh (e.g., '1 hour')"),
420 | cortextools.Required(),
421 | ),
422 | cortextools.WithString("schedule_interval",
423 | cortextools.Description("How often to refresh data (e.g., '1 day')"),
424 | cortextools.Required(),
425 | ),
426 | )
427 | }
428 |
429 | // CreateContinuousAggregatePolicyRemoveTool creates a specific tool for removing a refresh policy
430 | func (t *TimescaleDBTool) CreateContinuousAggregatePolicyRemoveTool(name string, dbID string) interface{} {
431 | return cortextools.NewTool(
432 | name,
433 | cortextools.WithDescription(fmt.Sprintf("Remove refresh policy from TimescaleDB continuous aggregate on %s", dbID)),
434 | cortextools.WithString("operation",
435 | cortextools.Description("The operation must be 'remove_continuous_aggregate_policy'"),
436 | cortextools.Required(),
437 | ),
438 | cortextools.WithString("view_name",
439 | cortextools.Description("Name of the continuous aggregate view"),
440 | cortextools.Required(),
441 | ),
442 | )
443 | }
444 |
445 | // HandleRequest handles a tool request
446 | func (t *TimescaleDBTool) HandleRequest(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
447 | // Extract parameters from the request
448 | if request.Parameters == nil {
449 | return nil, fmt.Errorf("missing parameters")
450 | }
451 |
452 | operation, ok := request.Parameters["operation"].(string)
453 | if !ok || operation == "" {
454 | return nil, fmt.Errorf("operation parameter is required")
455 | }
456 |
457 | // Route to the appropriate handler based on the operation
458 | switch strings.ToLower(operation) {
459 | case "create_hypertable":
460 | return t.handleCreateHypertable(ctx, request, dbID, useCase)
461 | case "list_hypertables":
462 | return t.handleListHypertables(ctx, request, dbID, useCase)
463 | case "enable_compression":
464 | return t.handleEnableCompression(ctx, request, dbID, useCase)
465 | case "disable_compression":
466 | return t.handleDisableCompression(ctx, request, dbID, useCase)
467 | case "add_compression_policy":
468 | return t.handleAddCompressionPolicy(ctx, request, dbID, useCase)
469 | case "remove_compression_policy":
470 | return t.handleRemoveCompressionPolicy(ctx, request, dbID, useCase)
471 | case "get_compression_settings":
472 | return t.handleGetCompressionSettings(ctx, request, dbID, useCase)
473 | case "add_retention_policy":
474 | return t.handleAddRetentionPolicy(ctx, request, dbID, useCase)
475 | case "remove_retention_policy":
476 | return t.handleRemoveRetentionPolicy(ctx, request, dbID, useCase)
477 | case "get_retention_policy":
478 | return t.handleGetRetentionPolicy(ctx, request, dbID, useCase)
479 | case "time_series_query":
480 | return t.handleTimeSeriesQuery(ctx, request, dbID, useCase)
481 | case "analyze_time_series":
482 | return t.handleTimeSeriesAnalyze(ctx, request, dbID, useCase)
483 | case "create_continuous_aggregate":
484 | return t.handleCreateContinuousAggregate(ctx, request, dbID, useCase)
485 | case "refresh_continuous_aggregate":
486 | return t.handleRefreshContinuousAggregate(ctx, request, dbID, useCase)
487 | case "drop_continuous_aggregate":
488 | return t.handleDropContinuousAggregate(ctx, request, dbID, useCase)
489 | case "list_continuous_aggregates":
490 | return t.handleListContinuousAggregates(ctx, request, dbID, useCase)
491 | case "get_continuous_aggregate_info":
492 | return t.handleGetContinuousAggregateInfo(ctx, request, dbID, useCase)
493 | case "add_continuous_aggregate_policy":
494 | return t.handleAddContinuousAggregatePolicy(ctx, request, dbID, useCase)
495 | case "remove_continuous_aggregate_policy":
496 | return t.handleRemoveContinuousAggregatePolicy(ctx, request, dbID, useCase)
497 | default:
498 | return map[string]interface{}{"message": fmt.Sprintf("Operation '%s' not implemented yet", operation)}, nil
499 | }
500 | }
501 |
502 | // handleCreateHypertable handles the create_hypertable operation
503 | func (t *TimescaleDBTool) handleCreateHypertable(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
504 | // Extract required parameters
505 | targetTable, ok := request.Parameters["target_table"].(string)
506 | if !ok || targetTable == "" {
507 | return nil, fmt.Errorf("target_table parameter is required")
508 | }
509 |
510 | timeColumn, ok := request.Parameters["time_column"].(string)
511 | if !ok || timeColumn == "" {
512 | return nil, fmt.Errorf("time_column parameter is required")
513 | }
514 |
515 | // Extract optional parameters
516 | chunkTimeInterval := getStringParam(request.Parameters, "chunk_time_interval")
517 | partitioningColumn := getStringParam(request.Parameters, "partitioning_column")
518 | ifNotExists := getBoolParam(request.Parameters, "if_not_exists")
519 |
520 | // Build the SQL statement to create a hypertable
521 | sql := buildCreateHypertableSQL(targetTable, timeColumn, chunkTimeInterval, partitioningColumn, ifNotExists)
522 |
523 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
524 | dbType, err := useCase.GetDatabaseType(dbID)
525 | if err != nil {
526 | return nil, fmt.Errorf("failed to get database type: %w", err)
527 | }
528 |
529 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
530 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
531 | }
532 |
533 | // Execute the statement
534 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
535 | if err != nil {
536 | return nil, fmt.Errorf("failed to create hypertable: %w", err)
537 | }
538 |
539 | return map[string]interface{}{
540 | "message": fmt.Sprintf("Successfully created hypertable '%s' with time column '%s'", targetTable, timeColumn),
541 | "details": result,
542 | }, nil
543 | }
544 |
545 | // handleListHypertables handles the list_hypertables operation
546 | func (t *TimescaleDBTool) handleListHypertables(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
547 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
548 | dbType, err := useCase.GetDatabaseType(dbID)
549 | if err != nil {
550 | return nil, fmt.Errorf("failed to get database type: %w", err)
551 | }
552 |
553 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
554 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
555 | }
556 |
557 | // Build the SQL query to list hypertables
558 | sql := `
559 | SELECT h.table_name, h.schema_name, d.column_name as time_column,
560 | count(d.id) as num_dimensions,
561 | (
562 | SELECT column_name FROM _timescaledb_catalog.dimension
563 | WHERE hypertable_id = h.id AND column_type != 'TIMESTAMP'
564 | AND column_type != 'TIMESTAMPTZ'
565 | LIMIT 1
566 | ) as space_column
567 | FROM _timescaledb_catalog.hypertable h
568 | JOIN _timescaledb_catalog.dimension d ON h.id = d.hypertable_id
569 | GROUP BY h.id, h.table_name, h.schema_name
570 | `
571 |
572 | // Execute the statement
573 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
574 | if err != nil {
575 | return nil, fmt.Errorf("failed to list hypertables: %w", err)
576 | }
577 |
578 | return map[string]interface{}{
579 | "message": "Successfully retrieved hypertables list",
580 | "details": result,
581 | }, nil
582 | }
583 |
584 | // handleEnableCompression handles the enable_compression operation
585 | func (t *TimescaleDBTool) handleEnableCompression(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
586 | // Extract required parameters
587 | targetTable, ok := request.Parameters["target_table"].(string)
588 | if !ok || targetTable == "" {
589 | return nil, fmt.Errorf("target_table parameter is required")
590 | }
591 |
592 | // Extract optional interval parameter
593 | afterInterval := getStringParam(request.Parameters, "after")
594 |
595 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
596 | dbType, err := useCase.GetDatabaseType(dbID)
597 | if err != nil {
598 | return nil, fmt.Errorf("failed to get database type: %w", err)
599 | }
600 |
601 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
602 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
603 | }
604 |
605 | // Build the SQL statement to enable compression
606 | sql := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress = true)", targetTable)
607 |
608 | // Execute the statement
609 | _, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
610 | if err != nil {
611 | return nil, fmt.Errorf("failed to enable compression: %w", err)
612 | }
613 |
614 | var message string
615 | // If interval is specified, add compression policy
616 | if afterInterval != "" {
617 | // Build the SQL statement for compression policy
618 | policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s')", targetTable, afterInterval)
619 |
620 | // Execute the statement
621 | _, err = useCase.ExecuteStatement(ctx, dbID, policySQL, nil)
622 | if err != nil {
623 | return nil, fmt.Errorf("failed to add compression policy: %w", err)
624 | }
625 |
626 | message = fmt.Sprintf("Successfully enabled compression on hypertable '%s' with automatic compression after '%s'", targetTable, afterInterval)
627 | } else {
628 | message = fmt.Sprintf("Successfully enabled compression on hypertable '%s'", targetTable)
629 | }
630 |
631 | return map[string]interface{}{
632 | "message": message,
633 | }, nil
634 | }
635 |
636 | // handleDisableCompression handles the disable_compression operation
637 | func (t *TimescaleDBTool) handleDisableCompression(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
638 | // Extract required parameters
639 | targetTable, ok := request.Parameters["target_table"].(string)
640 | if !ok || targetTable == "" {
641 | return nil, fmt.Errorf("target_table parameter is required")
642 | }
643 |
644 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
645 | dbType, err := useCase.GetDatabaseType(dbID)
646 | if err != nil {
647 | return nil, fmt.Errorf("failed to get database type: %w", err)
648 | }
649 |
650 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
651 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
652 | }
653 |
654 | // First, find and remove any existing compression policy
655 | policyQuery := fmt.Sprintf(
656 | "SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_compression'",
657 | targetTable,
658 | )
659 |
660 | policyResult, err := useCase.ExecuteStatement(ctx, dbID, policyQuery, nil)
661 | if err != nil {
662 | return nil, fmt.Errorf("failed to check for existing compression policy: %w", err)
663 | }
664 |
665 | // Check if a policy exists and remove it
666 | if policyResult != "" && policyResult != "[]" {
667 | // Parse the JSON result
668 | var policies []map[string]interface{}
669 | if err := json.Unmarshal([]byte(policyResult), &policies); err != nil {
670 | return nil, fmt.Errorf("failed to parse policy result: %w", err)
671 | }
672 |
673 | if len(policies) > 0 && policies[0]["job_id"] != nil {
674 | // Remove the policy
675 | jobID := policies[0]["job_id"]
676 | removePolicyQuery := fmt.Sprintf("SELECT remove_compression_policy(%v)", jobID)
677 | _, err = useCase.ExecuteStatement(ctx, dbID, removePolicyQuery, nil)
678 | if err != nil {
679 | return nil, fmt.Errorf("failed to remove compression policy: %w", err)
680 | }
681 | }
682 | }
683 |
684 | // Build the SQL statement to disable compression
685 | sql := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress = false)", targetTable)
686 |
687 | // Execute the statement
688 | _, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
689 | if err != nil {
690 | return nil, fmt.Errorf("failed to disable compression: %w", err)
691 | }
692 |
693 | return map[string]interface{}{
694 | "message": fmt.Sprintf("Successfully disabled compression on hypertable '%s'", targetTable),
695 | }, nil
696 | }
697 |
698 | // handleAddCompressionPolicy handles the add_compression_policy operation
699 | func (t *TimescaleDBTool) handleAddCompressionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
700 | // Extract required parameters
701 | targetTable, ok := request.Parameters["target_table"].(string)
702 | if !ok || targetTable == "" {
703 | return nil, fmt.Errorf("target_table parameter is required")
704 | }
705 |
706 | interval, ok := request.Parameters["interval"].(string)
707 | if !ok || interval == "" {
708 | return nil, fmt.Errorf("interval parameter is required")
709 | }
710 |
711 | // Extract optional parameters
712 | segmentBy := getStringParam(request.Parameters, "segment_by")
713 | orderBy := getStringParam(request.Parameters, "order_by")
714 |
715 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
716 | dbType, err := useCase.GetDatabaseType(dbID)
717 | if err != nil {
718 | return nil, fmt.Errorf("failed to get database type: %w", err)
719 | }
720 |
721 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
722 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
723 | }
724 |
725 | // First, check if compression is enabled
726 | compressionQuery := fmt.Sprintf(
727 | "SELECT compress FROM timescaledb_information.hypertables WHERE hypertable_name = '%s'",
728 | targetTable,
729 | )
730 |
731 | compressionResult, err := useCase.ExecuteStatement(ctx, dbID, compressionQuery, nil)
732 | if err != nil {
733 | return nil, fmt.Errorf("failed to check compression status: %w", err)
734 | }
735 |
736 | // Parse the result to check if compression is enabled
737 | var hypertables []map[string]interface{}
738 | if err := json.Unmarshal([]byte(compressionResult), &hypertables); err != nil {
739 | return nil, fmt.Errorf("failed to parse hypertable info: %w", err)
740 | }
741 |
742 | if len(hypertables) == 0 {
743 | return nil, fmt.Errorf("table '%s' is not a hypertable", targetTable)
744 | }
745 |
746 | isCompressed := false
747 | if compress, ok := hypertables[0]["compress"]; ok && compress != nil {
748 | isCompressed = fmt.Sprintf("%v", compress) == "true"
749 | }
750 |
751 | // If compression isn't enabled, enable it first
752 | if !isCompressed {
753 | enableSQL := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress = true)", targetTable)
754 | _, err = useCase.ExecuteStatement(ctx, dbID, enableSQL, nil)
755 | if err != nil {
756 | return nil, fmt.Errorf("failed to enable compression: %w", err)
757 | }
758 | }
759 |
760 | // Build the compression policy SQL
761 | var policyQueryBuilder strings.Builder
762 | policyQueryBuilder.WriteString(fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s'", targetTable, interval))
763 |
764 | if segmentBy != "" {
765 | policyQueryBuilder.WriteString(fmt.Sprintf(", segmentby => '%s'", segmentBy))
766 | }
767 |
768 | if orderBy != "" {
769 | policyQueryBuilder.WriteString(fmt.Sprintf(", orderby => '%s'", orderBy))
770 | }
771 |
772 | policyQueryBuilder.WriteString(")")
773 |
774 | // Execute the statement to add the compression policy
775 | _, err = useCase.ExecuteStatement(ctx, dbID, policyQueryBuilder.String(), nil)
776 | if err != nil {
777 | return nil, fmt.Errorf("failed to add compression policy: %w", err)
778 | }
779 |
780 | return map[string]interface{}{
781 | "message": fmt.Sprintf("Successfully added compression policy to hypertable '%s'", targetTable),
782 | }, nil
783 | }
784 |
785 | // handleRemoveCompressionPolicy handles the remove_compression_policy operation
786 | func (t *TimescaleDBTool) handleRemoveCompressionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
787 | // Extract required parameters
788 | targetTable, ok := request.Parameters["target_table"].(string)
789 | if !ok || targetTable == "" {
790 | return nil, fmt.Errorf("target_table parameter is required")
791 | }
792 |
793 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
794 | dbType, err := useCase.GetDatabaseType(dbID)
795 | if err != nil {
796 | return nil, fmt.Errorf("failed to get database type: %w", err)
797 | }
798 |
799 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
800 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
801 | }
802 |
803 | // Find the policy ID
804 | policyQuery := fmt.Sprintf(
805 | "SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_compression'",
806 | targetTable,
807 | )
808 |
809 | policyResult, err := useCase.ExecuteStatement(ctx, dbID, policyQuery, nil)
810 | if err != nil {
811 | return nil, fmt.Errorf("failed to find compression policy: %w", err)
812 | }
813 |
814 | // Parse the result to get the job ID
815 | var policies []map[string]interface{}
816 | if err := json.Unmarshal([]byte(policyResult), &policies); err != nil {
817 | return nil, fmt.Errorf("failed to parse policy info: %w", err)
818 | }
819 |
820 | if len(policies) == 0 {
821 | return map[string]interface{}{
822 | "message": fmt.Sprintf("No compression policy found for hypertable '%s'", targetTable),
823 | }, nil
824 | }
825 |
826 | jobID := policies[0]["job_id"]
827 | if jobID == nil {
828 | return nil, fmt.Errorf("invalid job ID for compression policy")
829 | }
830 |
831 | // Remove the policy
832 | removeSQL := fmt.Sprintf("SELECT remove_compression_policy(%v)", jobID)
833 | _, err = useCase.ExecuteStatement(ctx, dbID, removeSQL, nil)
834 | if err != nil {
835 | return nil, fmt.Errorf("failed to remove compression policy: %w", err)
836 | }
837 |
838 | return map[string]interface{}{
839 | "message": fmt.Sprintf("Successfully removed compression policy from hypertable '%s'", targetTable),
840 | }, nil
841 | }
842 |
843 | // handleGetCompressionSettings handles the get_compression_settings operation
844 | func (t *TimescaleDBTool) handleGetCompressionSettings(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
845 | // Extract required parameters
846 | targetTable, ok := request.Parameters["target_table"].(string)
847 | if !ok || targetTable == "" {
848 | return nil, fmt.Errorf("target_table parameter is required")
849 | }
850 |
851 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
852 | dbType, err := useCase.GetDatabaseType(dbID)
853 | if err != nil {
854 | return nil, fmt.Errorf("failed to get database type: %w", err)
855 | }
856 |
857 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
858 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
859 | }
860 |
861 | // Check if the table is a hypertable and has compression enabled
862 | hypertableQuery := fmt.Sprintf(
863 | "SELECT compress FROM timescaledb_information.hypertables WHERE hypertable_name = '%s'",
864 | targetTable,
865 | )
866 |
867 | hypertableResult, err := useCase.ExecuteStatement(ctx, dbID, hypertableQuery, nil)
868 | if err != nil {
869 | return nil, fmt.Errorf("failed to check hypertable info: %w", err)
870 | }
871 |
872 | // Parse the result
873 | var hypertables []map[string]interface{}
874 | if err := json.Unmarshal([]byte(hypertableResult), &hypertables); err != nil {
875 | return nil, fmt.Errorf("failed to parse hypertable info: %w", err)
876 | }
877 |
878 | if len(hypertables) == 0 {
879 | return nil, fmt.Errorf("table '%s' is not a hypertable", targetTable)
880 | }
881 |
882 | // Create settings object
883 | settings := map[string]interface{}{
884 | "hypertable_name": targetTable,
885 | "compression_enabled": false,
886 | "segment_by": nil,
887 | "order_by": nil,
888 | "chunk_time_interval": nil,
889 | "compression_interval": nil,
890 | }
891 |
892 | isCompressed := false
893 | if compress, ok := hypertables[0]["compress"]; ok && compress != nil {
894 | isCompressed = fmt.Sprintf("%v", compress) == "true"
895 | }
896 |
897 | settings["compression_enabled"] = isCompressed
898 |
899 | if isCompressed {
900 | // Get compression settings
901 | compressionQuery := fmt.Sprintf(
902 | "SELECT segmentby, orderby FROM timescaledb_information.compression_settings WHERE hypertable_name = '%s'",
903 | targetTable,
904 | )
905 |
906 | compressionResult, err := useCase.ExecuteStatement(ctx, dbID, compressionQuery, nil)
907 | if err != nil {
908 | return nil, fmt.Errorf("failed to get compression settings: %w", err)
909 | }
910 |
911 | var compressionSettings []map[string]interface{}
912 | if err := json.Unmarshal([]byte(compressionResult), &compressionSettings); err != nil {
913 | return nil, fmt.Errorf("failed to parse compression settings: %w", err)
914 | }
915 |
916 | if len(compressionSettings) > 0 {
917 | if segmentBy, ok := compressionSettings[0]["segmentby"]; ok && segmentBy != nil {
918 | settings["segment_by"] = segmentBy
919 | }
920 |
921 | if orderBy, ok := compressionSettings[0]["orderby"]; ok && orderBy != nil {
922 | settings["order_by"] = orderBy
923 | }
924 | }
925 |
926 | // Get policy information
927 | policyQuery := fmt.Sprintf(
928 | "SELECT s.schedule_interval, h.chunk_time_interval FROM timescaledb_information.jobs j "+
929 | "JOIN timescaledb_information.job_stats s ON j.job_id = s.job_id "+
930 | "JOIN timescaledb_information.hypertables h ON j.hypertable_name = h.hypertable_name "+
931 | "WHERE j.hypertable_name = '%s' AND j.proc_name = 'policy_compression'",
932 | targetTable,
933 | )
934 |
935 | policyResult, err := useCase.ExecuteStatement(ctx, dbID, policyQuery, nil)
936 | if err == nil {
937 | var policyInfo []map[string]interface{}
938 | if err := json.Unmarshal([]byte(policyResult), &policyInfo); err != nil {
939 | return nil, fmt.Errorf("failed to parse policy info: %w", err)
940 | }
941 |
942 | if len(policyInfo) > 0 {
943 | if interval, ok := policyInfo[0]["schedule_interval"]; ok && interval != nil {
944 | settings["compression_interval"] = interval
945 | }
946 |
947 | if chunkInterval, ok := policyInfo[0]["chunk_time_interval"]; ok && chunkInterval != nil {
948 | settings["chunk_time_interval"] = chunkInterval
949 | }
950 | }
951 | }
952 | }
953 |
954 | return map[string]interface{}{
955 | "message": fmt.Sprintf("Retrieved compression settings for hypertable '%s'", targetTable),
956 | "settings": settings,
957 | }, nil
958 | }
959 |
960 | // handleAddRetentionPolicy handles the add_retention_policy operation
961 | func (t *TimescaleDBTool) handleAddRetentionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
962 | // Extract required parameters
963 | targetTable, ok := request.Parameters["target_table"].(string)
964 | if !ok || targetTable == "" {
965 | return nil, fmt.Errorf("target_table parameter is required")
966 | }
967 |
968 | retentionInterval, ok := request.Parameters["retention_interval"].(string)
969 | if !ok || retentionInterval == "" {
970 | return nil, fmt.Errorf("retention_interval parameter is required")
971 | }
972 |
973 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
974 | dbType, err := useCase.GetDatabaseType(dbID)
975 | if err != nil {
976 | return nil, fmt.Errorf("failed to get database type: %w", err)
977 | }
978 |
979 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
980 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
981 | }
982 |
983 | // Build the SQL statement to add a retention policy
984 | sql := fmt.Sprintf("SELECT add_retention_policy('%s', INTERVAL '%s')", targetTable, retentionInterval)
985 |
986 | // Execute the statement
987 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
988 | if err != nil {
989 | return nil, fmt.Errorf("failed to add retention policy: %w", err)
990 | }
991 |
992 | return map[string]interface{}{
993 | "message": fmt.Sprintf("Successfully added retention policy to '%s' with interval '%s'", targetTable, retentionInterval),
994 | "details": result,
995 | }, nil
996 | }
997 |
998 | // handleRemoveRetentionPolicy handles the remove_retention_policy operation
999 | func (t *TimescaleDBTool) handleRemoveRetentionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1000 | // Extract required parameters
1001 | targetTable, ok := request.Parameters["target_table"].(string)
1002 | if !ok || targetTable == "" {
1003 | return nil, fmt.Errorf("target_table parameter is required")
1004 | }
1005 |
1006 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
1007 | dbType, err := useCase.GetDatabaseType(dbID)
1008 | if err != nil {
1009 | return nil, fmt.Errorf("failed to get database type: %w", err)
1010 | }
1011 |
1012 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
1013 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
1014 | }
1015 |
1016 | // First, find the policy job ID
1017 | findPolicySQL := fmt.Sprintf(
1018 | "SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_retention'",
1019 | targetTable,
1020 | )
1021 |
1022 | // Execute the statement to find the policy
1023 | policyResult, err := useCase.ExecuteStatement(ctx, dbID, findPolicySQL, nil)
1024 | if err != nil {
1025 | return nil, fmt.Errorf("failed to find retention policy: %w", err)
1026 | }
1027 |
1028 | // Check if we found a policy
1029 | if policyResult == "[]" || policyResult == "" {
1030 | return map[string]interface{}{
1031 | "message": fmt.Sprintf("No retention policy found for table '%s'", targetTable),
1032 | }, nil
1033 | }
1034 |
1035 | // Now remove the policy - assuming we received a JSON array with the job_id
1036 | removeSQL := fmt.Sprintf(
1037 | "SELECT remove_retention_policy((SELECT job_id FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_retention' LIMIT 1))",
1038 | targetTable,
1039 | )
1040 |
1041 | // Execute the statement to remove the policy
1042 | result, err := useCase.ExecuteStatement(ctx, dbID, removeSQL, nil)
1043 | if err != nil {
1044 | return nil, fmt.Errorf("failed to remove retention policy: %w", err)
1045 | }
1046 |
1047 | return map[string]interface{}{
1048 | "message": fmt.Sprintf("Successfully removed retention policy from '%s'", targetTable),
1049 | "details": result,
1050 | }, nil
1051 | }
1052 |
1053 | // handleGetRetentionPolicy handles the get_retention_policy operation
1054 | func (t *TimescaleDBTool) handleGetRetentionPolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1055 | // Extract required parameters
1056 | targetTable, ok := request.Parameters["target_table"].(string)
1057 | if !ok || targetTable == "" {
1058 | return nil, fmt.Errorf("target_table parameter is required")
1059 | }
1060 |
1061 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
1062 | dbType, err := useCase.GetDatabaseType(dbID)
1063 | if err != nil {
1064 | return nil, fmt.Errorf("failed to get database type: %w", err)
1065 | }
1066 |
1067 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
1068 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
1069 | }
1070 |
1071 | // Build the SQL query to get retention policy details
1072 | sql := fmt.Sprintf(`
1073 | SELECT
1074 | '%s' as hypertable_name,
1075 | js.schedule_interval as retention_interval,
1076 | CASE WHEN j.job_id IS NOT NULL THEN true ELSE false END as retention_enabled
1077 | FROM
1078 | timescaledb_information.jobs j
1079 | JOIN
1080 | timescaledb_information.job_stats js ON j.job_id = js.job_id
1081 | WHERE
1082 | j.hypertable_name = '%s' AND j.proc_name = 'policy_retention'
1083 | `, targetTable, targetTable)
1084 |
1085 | // Execute the statement
1086 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1087 | if err != nil {
1088 | return nil, fmt.Errorf("failed to get retention policy: %w", err)
1089 | }
1090 |
1091 | // Check if we got any results
1092 | if result == "[]" || result == "" {
1093 | // No retention policy found, return a default structure
1094 | return map[string]interface{}{
1095 | "message": fmt.Sprintf("No retention policy found for table '%s'", targetTable),
1096 | "details": fmt.Sprintf(`[{"hypertable_name":"%s","retention_enabled":false}]`, targetTable),
1097 | }, nil
1098 | }
1099 |
1100 | return map[string]interface{}{
1101 | "message": fmt.Sprintf("Successfully retrieved retention policy for '%s'", targetTable),
1102 | "details": result,
1103 | }, nil
1104 | }
1105 |
1106 | // handleTimeSeriesQuery handles the time_series_query operation
1107 | func (t *TimescaleDBTool) handleTimeSeriesQuery(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1108 | // Extract required parameters
1109 | targetTable, ok := request.Parameters["target_table"].(string)
1110 | if !ok || targetTable == "" {
1111 | return nil, fmt.Errorf("target_table parameter is required")
1112 | }
1113 |
1114 | timeColumn, ok := request.Parameters["time_column"].(string)
1115 | if !ok || timeColumn == "" {
1116 | return nil, fmt.Errorf("time_column parameter is required")
1117 | }
1118 |
1119 | bucketInterval, ok := request.Parameters["bucket_interval"].(string)
1120 | if !ok || bucketInterval == "" {
1121 | return nil, fmt.Errorf("bucket_interval parameter is required")
1122 | }
1123 |
1124 | // Extract optional parameters
1125 | startTimeStr := getStringParam(request.Parameters, "start_time")
1126 | endTimeStr := getStringParam(request.Parameters, "end_time")
1127 | aggregations := getStringParam(request.Parameters, "aggregations")
1128 | whereCondition := getStringParam(request.Parameters, "where_condition")
1129 | groupBy := getStringParam(request.Parameters, "group_by")
1130 | orderBy := getStringParam(request.Parameters, "order_by")
1131 | windowFunctions := getStringParam(request.Parameters, "window_functions")
1132 | limitStr := getStringParam(request.Parameters, "limit")
1133 | formatPretty := getBoolParam(request.Parameters, "format_pretty")
1134 |
1135 | // Set default values for optional parameters
1136 | if aggregations == "" {
1137 | aggregations = "count(*) as count"
1138 | }
1139 |
1140 | // Build WHERE clause
1141 | whereClause := ""
1142 | if startTimeStr != "" && endTimeStr != "" {
1143 | whereClause = fmt.Sprintf("%s BETWEEN '%s' AND '%s'", timeColumn, startTimeStr, endTimeStr)
1144 | if whereCondition != "" {
1145 | whereClause = fmt.Sprintf("%s AND %s", whereClause, whereCondition)
1146 | }
1147 | } else if whereCondition != "" {
1148 | whereClause = whereCondition
1149 | } else {
1150 | whereClause = "1=1" // Always true if no conditions
1151 | }
1152 |
1153 | // Set default group by if not provided
1154 | if groupBy == "" {
1155 | groupBy = "time_bucket"
1156 | } else {
1157 | groupBy = fmt.Sprintf("time_bucket, %s", groupBy)
1158 | }
1159 |
1160 | // Set default order by if not provided
1161 | if orderBy == "" {
1162 | orderBy = "time_bucket"
1163 | }
1164 |
1165 | // Set default limit if not provided
1166 | limit := 1000 // Default limit
1167 | if limitStr != "" {
1168 | if parsedLimit, err := strconv.Atoi(limitStr); err == nil && parsedLimit > 0 {
1169 | limit = parsedLimit
1170 | }
1171 | }
1172 |
1173 | // Build the base SQL query
1174 | var sql string
1175 | if windowFunctions == "" {
1176 | // Simple query without window functions
1177 | sql = fmt.Sprintf(`
1178 | SELECT
1179 | time_bucket('%s', %s) as time_bucket,
1180 | %s
1181 | FROM
1182 | %s
1183 | WHERE
1184 | %s
1185 | GROUP BY
1186 | %s
1187 | ORDER BY
1188 | %s
1189 | LIMIT %d
1190 | `, bucketInterval, timeColumn, aggregations, targetTable, whereClause, groupBy, orderBy, limit)
1191 | } else {
1192 | // Query with window functions - need to use a subquery
1193 | sql = fmt.Sprintf(`
1194 | SELECT
1195 | time_bucket,
1196 | %s,
1197 | %s
1198 | FROM (
1199 | SELECT
1200 | time_bucket('%s', %s) as time_bucket,
1201 | %s
1202 | FROM
1203 | %s
1204 | WHERE
1205 | %s
1206 | GROUP BY
1207 | %s
1208 | ORDER BY
1209 | %s
1210 | ) AS sub
1211 | ORDER BY
1212 | %s
1213 | LIMIT %d
1214 | `, aggregations, windowFunctions, bucketInterval, timeColumn, aggregations, targetTable, whereClause, groupBy, orderBy, orderBy, limit)
1215 | }
1216 |
1217 | // Execute the query
1218 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1219 | if err != nil {
1220 | return nil, fmt.Errorf("failed to execute time-series query: %w", err)
1221 | }
1222 |
1223 | // Generate the response
1224 | response := map[string]interface{}{
1225 | "message": "Successfully retrieved time-series data",
1226 | "details": result,
1227 | }
1228 |
1229 | // Add metadata if pretty format is requested
1230 | if formatPretty {
1231 | // Try to parse the result JSON for better presentation
1232 | var resultData []map[string]interface{}
1233 | if err := json.Unmarshal([]byte(result), &resultData); err == nil {
1234 | // Add statistics about the data
1235 | numRows := len(resultData)
1236 | response = addMetadata(response, "num_rows", numRows)
1237 | response = addMetadata(response, "time_bucket_interval", bucketInterval)
1238 |
1239 | if numRows > 0 {
1240 | // Extract time range from the data if available
1241 | if firstBucket, ok := resultData[0]["time_bucket"].(string); ok {
1242 | response = addMetadata(response, "first_bucket", firstBucket)
1243 | }
1244 | if lastBucket, ok := resultData[numRows-1]["time_bucket"].(string); ok {
1245 | response = addMetadata(response, "last_bucket", lastBucket)
1246 | }
1247 | }
1248 | }
1249 | }
1250 |
1251 | return response, nil
1252 | }
1253 |
1254 | // handleTimeSeriesAnalyze handles the analyze_time_series operation
1255 | func (t *TimescaleDBTool) handleTimeSeriesAnalyze(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1256 | // Extract required parameters
1257 | targetTable, ok := request.Parameters["target_table"].(string)
1258 | if !ok || targetTable == "" {
1259 | return nil, fmt.Errorf("target_table parameter is required")
1260 | }
1261 |
1262 | timeColumn, ok := request.Parameters["time_column"].(string)
1263 | if !ok || timeColumn == "" {
1264 | return nil, fmt.Errorf("time_column parameter is required")
1265 | }
1266 |
1267 | // Extract optional parameters
1268 | startTimeStr := getStringParam(request.Parameters, "start_time")
1269 | endTimeStr := getStringParam(request.Parameters, "end_time")
1270 |
1271 | // Build WHERE clause
1272 | whereClause := ""
1273 | if startTimeStr != "" && endTimeStr != "" {
1274 | whereClause = fmt.Sprintf("WHERE %s BETWEEN '%s' AND '%s'", timeColumn, startTimeStr, endTimeStr)
1275 | }
1276 |
1277 | // Build the SQL query for basic time series analysis
1278 | sql := fmt.Sprintf(`
1279 | SELECT
1280 | COUNT(*) as row_count,
1281 | MIN(%s) as min_time,
1282 | MAX(%s) as max_time,
1283 | (MAX(%s) - MIN(%s)) as time_span,
1284 | COUNT(DISTINCT date_trunc('day', %s)) as unique_days
1285 | FROM
1286 | %s
1287 | %s
1288 | `, timeColumn, timeColumn, timeColumn, timeColumn, timeColumn, targetTable, whereClause)
1289 |
1290 | // Execute the query
1291 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1292 | if err != nil {
1293 | return nil, fmt.Errorf("failed to analyze time-series data: %w", err)
1294 | }
1295 |
1296 | return map[string]interface{}{
1297 | "message": "Successfully analyzed time-series data",
1298 | "details": result,
1299 | }, nil
1300 | }
1301 |
1302 | // handleCreateContinuousAggregate handles the create_continuous_aggregate operation
1303 | func (t *TimescaleDBTool) handleCreateContinuousAggregate(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1304 | // Extract required parameters
1305 | viewName, ok := request.Parameters["view_name"].(string)
1306 | if !ok || viewName == "" {
1307 | return nil, fmt.Errorf("view_name parameter is required")
1308 | }
1309 |
1310 | sourceTable, ok := request.Parameters["source_table"].(string)
1311 | if !ok || sourceTable == "" {
1312 | return nil, fmt.Errorf("source_table parameter is required")
1313 | }
1314 |
1315 | timeColumn, ok := request.Parameters["time_column"].(string)
1316 | if !ok || timeColumn == "" {
1317 | return nil, fmt.Errorf("time_column parameter is required")
1318 | }
1319 |
1320 | bucketInterval, ok := request.Parameters["bucket_interval"].(string)
1321 | if !ok || bucketInterval == "" {
1322 | return nil, fmt.Errorf("bucket_interval parameter is required")
1323 | }
1324 |
1325 | // Extract optional parameters
1326 | aggregationsStr := getStringParam(request.Parameters, "aggregations")
1327 | whereCondition := getStringParam(request.Parameters, "where_condition")
1328 | withData := getBoolParam(request.Parameters, "with_data")
1329 | refreshPolicy := getBoolParam(request.Parameters, "refresh_policy")
1330 | refreshInterval := getStringParam(request.Parameters, "refresh_interval")
1331 |
1332 | // Parse aggregations from comma-separated string
1333 | var aggregationsParts []string
1334 | if aggregationsStr != "" {
1335 | aggregationsParts = strings.Split(aggregationsStr, ",")
1336 | } else {
1337 | // Default aggregation if none specified
1338 | aggregationsParts = []string{"COUNT(*) AS count"}
1339 | }
1340 |
1341 | // Build the SQL statement to create a continuous aggregate
1342 | var builder strings.Builder
1343 | builder.WriteString("CREATE MATERIALIZED VIEW ")
1344 | builder.WriteString(viewName)
1345 | builder.WriteString("\nAS SELECT\n time_bucket('")
1346 | builder.WriteString(bucketInterval)
1347 | builder.WriteString("', ")
1348 | builder.WriteString(timeColumn)
1349 | builder.WriteString(") AS time_bucket")
1350 |
1351 | // Add aggregations
1352 | for _, agg := range aggregationsParts {
1353 | builder.WriteString(",\n ")
1354 | builder.WriteString(strings.TrimSpace(agg))
1355 | }
1356 |
1357 | // Add FROM clause
1358 | builder.WriteString("\nFROM ")
1359 | builder.WriteString(sourceTable)
1360 |
1361 | // Add WHERE clause if specified
1362 | if whereCondition != "" {
1363 | builder.WriteString("\nWHERE ")
1364 | builder.WriteString(whereCondition)
1365 | }
1366 |
1367 | // Add GROUP BY clause
1368 | builder.WriteString("\nGROUP BY time_bucket")
1369 |
1370 | // Add WITH DATA or WITH NO DATA
1371 | if withData {
1372 | builder.WriteString("\nWITH DATA")
1373 | } else {
1374 | builder.WriteString("\nWITH NO DATA")
1375 | }
1376 |
1377 | // Execute the statement
1378 | _, err := useCase.ExecuteStatement(ctx, dbID, builder.String(), nil)
1379 | if err != nil {
1380 | return nil, fmt.Errorf("failed to create continuous aggregate: %w", err)
1381 | }
1382 |
1383 | // Add refresh policy if requested
1384 | if refreshPolicy && refreshInterval != "" {
1385 | policySQL := fmt.Sprintf("SELECT add_continuous_aggregate_policy('%s', start_offset => INTERVAL '1 week', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '%s')", viewName, refreshInterval)
1386 | _, err := useCase.ExecuteStatement(ctx, dbID, policySQL, nil)
1387 | if err != nil {
1388 | return map[string]interface{}{
1389 | "message": fmt.Sprintf("Created continuous aggregate '%s' but failed to add refresh policy: %s", viewName, err.Error()),
1390 | }, nil
1391 | }
1392 | }
1393 |
1394 | return map[string]interface{}{
1395 | "message": fmt.Sprintf("Successfully created continuous aggregate '%s'", viewName),
1396 | "sql": builder.String(),
1397 | }, nil
1398 | }
1399 |
1400 | // handleRefreshContinuousAggregate handles the refresh_continuous_aggregate operation
1401 | func (t *TimescaleDBTool) handleRefreshContinuousAggregate(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1402 | // Extract required parameters
1403 | viewName, ok := request.Parameters["view_name"].(string)
1404 | if !ok || viewName == "" {
1405 | return nil, fmt.Errorf("view_name parameter is required")
1406 | }
1407 |
1408 | // Extract optional parameters
1409 | startTimeStr := getStringParam(request.Parameters, "start_time")
1410 | endTimeStr := getStringParam(request.Parameters, "end_time")
1411 |
1412 | // Build the SQL statement to refresh a continuous aggregate
1413 | var sql string
1414 | if startTimeStr != "" && endTimeStr != "" {
1415 | sql = fmt.Sprintf("CALL refresh_continuous_aggregate('%s', '%s', '%s')",
1416 | viewName, startTimeStr, endTimeStr)
1417 | } else {
1418 | sql = fmt.Sprintf("CALL refresh_continuous_aggregate('%s', NULL, NULL)", viewName)
1419 | }
1420 |
1421 | // Execute the statement
1422 | _, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1423 | if err != nil {
1424 | return nil, fmt.Errorf("failed to refresh continuous aggregate: %w", err)
1425 | }
1426 |
1427 | return map[string]interface{}{
1428 | "message": fmt.Sprintf("Successfully refreshed continuous aggregate '%s'", viewName),
1429 | }, nil
1430 | }
1431 |
1432 | // handleDropContinuousAggregate handles the drop_continuous_aggregate operation
1433 | func (t *TimescaleDBTool) handleDropContinuousAggregate(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1434 | // Extract required parameters
1435 | viewName, ok := request.Parameters["view_name"].(string)
1436 | if !ok || viewName == "" {
1437 | return nil, fmt.Errorf("view_name parameter is required")
1438 | }
1439 |
1440 | // Extract optional parameters
1441 | cascade := getBoolParam(request.Parameters, "cascade")
1442 |
1443 | // Build the SQL statement to drop a continuous aggregate
1444 | sql := fmt.Sprintf("DROP MATERIALIZED VIEW %s", viewName)
1445 |
1446 | if cascade {
1447 | sql += " CASCADE"
1448 | }
1449 |
1450 | // Execute the statement
1451 | _, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1452 | if err != nil {
1453 | return nil, fmt.Errorf("failed to drop continuous aggregate: %w", err)
1454 | }
1455 |
1456 | return map[string]interface{}{
1457 | "message": fmt.Sprintf("Successfully dropped continuous aggregate '%s'", viewName),
1458 | }, nil
1459 | }
1460 |
1461 | // handleListContinuousAggregates handles the list_continuous_aggregates operation
1462 | func (t *TimescaleDBTool) handleListContinuousAggregates(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1463 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
1464 | dbType, err := useCase.GetDatabaseType(dbID)
1465 | if err != nil {
1466 | return nil, fmt.Errorf("failed to get database type: %w", err)
1467 | }
1468 |
1469 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
1470 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
1471 | }
1472 |
1473 | // Build the SQL query to list continuous aggregates
1474 | sql := `
1475 | SELECT view_name, source_table, time_column, bucket_interval, aggregations, where_condition, with_data, refresh_policy, refresh_interval
1476 | FROM timescaledb_information.continuous_aggregates
1477 | `
1478 |
1479 | // Execute the statement
1480 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1481 | if err != nil {
1482 | return nil, fmt.Errorf("failed to list continuous aggregates: %w", err)
1483 | }
1484 |
1485 | return map[string]interface{}{
1486 | "message": "Successfully retrieved continuous aggregates list",
1487 | "details": result,
1488 | }, nil
1489 | }
1490 |
1491 | // handleGetContinuousAggregateInfo handles the get_continuous_aggregate_info operation
1492 | func (t *TimescaleDBTool) handleGetContinuousAggregateInfo(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1493 | // Extract required parameters
1494 | viewName, ok := request.Parameters["view_name"].(string)
1495 | if !ok || viewName == "" {
1496 | return nil, fmt.Errorf("view_name parameter is required")
1497 | }
1498 |
1499 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
1500 | dbType, err := useCase.GetDatabaseType(dbID)
1501 | if err != nil {
1502 | return nil, fmt.Errorf("failed to get database type: %w", err)
1503 | }
1504 |
1505 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
1506 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
1507 | }
1508 |
1509 | // Build the SQL query to get continuous aggregate information
1510 | sql := fmt.Sprintf(`
1511 | SELECT
1512 | view_name,
1513 | source_table,
1514 | time_column,
1515 | bucket_interval,
1516 | aggregations,
1517 | where_condition,
1518 | with_data,
1519 | refresh_policy,
1520 | refresh_interval
1521 | FROM
1522 | timescaledb_information.continuous_aggregates
1523 | WHERE
1524 | view_name = '%s'
1525 | `, viewName)
1526 |
1527 | // Execute the statement
1528 | result, err := useCase.ExecuteStatement(ctx, dbID, sql, nil)
1529 | if err != nil {
1530 | return nil, fmt.Errorf("failed to get continuous aggregate info: %w", err)
1531 | }
1532 |
1533 | return map[string]interface{}{
1534 | "message": fmt.Sprintf("Successfully retrieved continuous aggregate information for '%s'", viewName),
1535 | "details": result,
1536 | }, nil
1537 | }
1538 |
1539 | // handleAddContinuousAggregatePolicy handles the add_continuous_aggregate_policy operation
1540 | func (t *TimescaleDBTool) handleAddContinuousAggregatePolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1541 | // Extract required parameters
1542 | viewName, ok := request.Parameters["view_name"].(string)
1543 | if !ok || viewName == "" {
1544 | return nil, fmt.Errorf("view_name parameter is required")
1545 | }
1546 |
1547 | startOffset, ok := request.Parameters["start_offset"].(string)
1548 | if !ok || startOffset == "" {
1549 | return nil, fmt.Errorf("start_offset parameter is required")
1550 | }
1551 |
1552 | endOffset, ok := request.Parameters["end_offset"].(string)
1553 | if !ok || endOffset == "" {
1554 | return nil, fmt.Errorf("end_offset parameter is required")
1555 | }
1556 |
1557 | scheduleInterval, ok := request.Parameters["schedule_interval"].(string)
1558 | if !ok || scheduleInterval == "" {
1559 | return nil, fmt.Errorf("schedule_interval parameter is required")
1560 | }
1561 |
1562 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
1563 | dbType, err := useCase.GetDatabaseType(dbID)
1564 | if err != nil {
1565 | return nil, fmt.Errorf("failed to get database type: %w", err)
1566 | }
1567 |
1568 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
1569 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
1570 | }
1571 |
1572 | // Build the SQL statement to add a continuous aggregate policy
1573 | sql := fmt.Sprintf("SELECT add_continuous_aggregate_policy('%s', start_offset => INTERVAL '%s', end_offset => INTERVAL '%s', schedule_interval => INTERVAL '%s')",
1574 | viewName, startOffset, endOffset, scheduleInterval)
1575 |
1576 | // Execute the statement
1577 | _, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
1578 | if err != nil {
1579 | return nil, fmt.Errorf("failed to add continuous aggregate policy: %w", err)
1580 | }
1581 |
1582 | return map[string]interface{}{
1583 | "message": fmt.Sprintf("Successfully added continuous aggregate policy to '%s'", viewName),
1584 | }, nil
1585 | }
1586 |
1587 | // handleRemoveContinuousAggregatePolicy handles the remove_continuous_aggregate_policy operation
1588 | func (t *TimescaleDBTool) handleRemoveContinuousAggregatePolicy(ctx context.Context, request server.ToolCallRequest, dbID string, useCase UseCaseProvider) (interface{}, error) {
1589 | // Extract required parameters
1590 | viewName, ok := request.Parameters["view_name"].(string)
1591 | if !ok || viewName == "" {
1592 | return nil, fmt.Errorf("view_name parameter is required")
1593 | }
1594 |
1595 | // Check if the database is PostgreSQL (TimescaleDB requires PostgreSQL)
1596 | dbType, err := useCase.GetDatabaseType(dbID)
1597 | if err != nil {
1598 | return nil, fmt.Errorf("failed to get database type: %w", err)
1599 | }
1600 |
1601 | if !strings.Contains(strings.ToLower(dbType), "postgres") {
1602 | return nil, fmt.Errorf("TimescaleDB operations are only supported on PostgreSQL databases")
1603 | }
1604 |
1605 | // Build the SQL statement to remove a continuous aggregate policy
1606 | sql := fmt.Sprintf("SELECT remove_continuous_aggregate_policy('%s')", viewName)
1607 |
1608 | // Execute the statement
1609 | _, err = useCase.ExecuteStatement(ctx, dbID, sql, nil)
1610 | if err != nil {
1611 | return nil, fmt.Errorf("failed to remove continuous aggregate policy: %w", err)
1612 | }
1613 |
1614 | return map[string]interface{}{
1615 | "message": fmt.Sprintf("Successfully removed continuous aggregate policy from '%s'", viewName),
1616 | }, nil
1617 | }
1618 |
1619 | // getStringParam safely extracts a string parameter from a parameter map
1620 | func getStringParam(params map[string]interface{}, key string) string {
1621 | if value, ok := params[key].(string); ok {
1622 | return value
1623 | }
1624 | return ""
1625 | }
1626 |
1627 | // getBoolParam safely extracts a boolean parameter from a parameter map
1628 | func getBoolParam(params map[string]interface{}, key string) bool {
1629 | if value, ok := params[key].(bool); ok {
1630 | return value
1631 | }
1632 | return false
1633 | }
1634 |
1635 | // buildCreateHypertableSQL constructs the SQL statement to create a hypertable
1636 | func buildCreateHypertableSQL(table, timeColumn, chunkTimeInterval, partitioningColumn string, ifNotExists bool) string {
1637 | var args []string
1638 |
1639 | // Add required arguments: table name and time column
1640 | args = append(args, fmt.Sprintf("'%s'", table))
1641 | args = append(args, fmt.Sprintf("'%s'", timeColumn))
1642 |
1643 | // Build optional parameters
1644 | var options []string
1645 |
1646 | if chunkTimeInterval != "" {
1647 | options = append(options, fmt.Sprintf("chunk_time_interval => interval '%s'", chunkTimeInterval))
1648 | }
1649 |
1650 | if partitioningColumn != "" {
1651 | options = append(options, fmt.Sprintf("partitioning_column => '%s'", partitioningColumn))
1652 | }
1653 |
1654 | options = append(options, fmt.Sprintf("if_not_exists => %t", ifNotExists))
1655 |
1656 | // Construct the full SQL statement
1657 | sql := fmt.Sprintf("SELECT create_hypertable(%s", strings.Join(args, ", "))
1658 |
1659 | if len(options) > 0 {
1660 | sql += ", " + strings.Join(options, ", ")
1661 | }
1662 |
1663 | sql += ")"
1664 |
1665 | return sql
1666 | }
1667 |
1668 | // RegisterTimescaleDBTools registers TimescaleDB tools
1669 | func RegisterTimescaleDBTools(registry interface{}) error {
1670 | // Cast the registry to the expected type
1671 | toolRegistry, ok := registry.(*ToolTypeFactory)
1672 | if !ok {
1673 | return fmt.Errorf("invalid registry type")
1674 | }
1675 |
1676 | // Create the TimescaleDB tool
1677 | tool := NewTimescaleDBTool()
1678 |
1679 | // Register it with the factory
1680 | toolRegistry.Register(tool)
1681 |
1682 | return nil
1683 | }
1684 |
```