This is page 44 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_endpoints.py:
--------------------------------------------------------------------------------
```python
1 | """FastAPI endpoints for UMS API."""
2 |
3 | import json
4 | import math
5 | import sqlite3
6 | from collections import Counter, defaultdict, deque
7 | from datetime import datetime
8 | from pathlib import Path
9 | from threading import Lock
10 | from typing import Any, Dict, List, Optional
11 |
12 | from fastapi import Body, FastAPI, HTTPException, Query
13 | from fastapi import Path as ApiPath
14 | from fastapi.responses import FileResponse, JSONResponse, RedirectResponse, Response
15 |
16 | from .ums_models import *
17 | from .ums_services import *
18 | from .ums_database import get_db_connection
19 |
20 | def setup_ums_api(app: FastAPI) -> None:
21 | """
22 | Set up all UMS API endpoints on the provided FastAPI app.
23 |
24 | This function registers all the UMS (Unified Memory System) API endpoints
25 | including cognitive states, action monitoring, performance profiling,
26 | working memory, artifacts, and memory quality management.
27 |
28 | Args:
29 | app: FastAPI application instance to register endpoints on
30 | """
31 |
32 | # ---------- Setup and Helper Functions ----------
33 |
34 | # Legacy alias for older route-registration code
35 | app = api_app # DO NOT REMOVE – keeps backward-compatibility # noqa: F841
36 | # Note: app parameter is passed to this function
37 | # -------------------------------------------------
38 | # UMS Explorer: static assets, DB helpers, and APIs
39 | # -------------------------------------------------
40 |
41 | # Paths & database setup
42 | project_root = Path(__file__).resolve().parent.parent.parent
43 | tools_dir = project_root / "ultimate_mcp_server" / "tools"
44 | storage_dir = project_root / "storage"
45 | DATABASE_PATH = str(storage_dir / "unified_agent_memory.db")
46 |
47 | def get_db_connection() -> sqlite3.Connection:
48 | """Return a SQLite connection with row factory."""
49 | conn = sqlite3.connect(DATABASE_PATH)
50 | conn.row_factory = sqlite3.Row
51 | return conn
52 |
53 | # ---------- Helper functions ----------
54 | def _dict_depth(d: Dict[str, Any], depth: int = 0) -> int:
55 | if not isinstance(d, dict) or not d:
56 | return depth
57 | return max(_dict_depth(v, depth + 1) for v in d.values())
58 |
59 | def _count_values(d: Dict[str, Any]) -> int:
60 | cnt = 0
61 | for v in d.values():
62 | if isinstance(v, dict):
63 | cnt += _count_values(v)
64 | elif isinstance(v, list):
65 | cnt += len(v)
66 | else:
67 | cnt += 1
68 | return cnt
69 |
70 | def calculate_state_complexity(state_data: Dict[str, Any]) -> float:
71 | if not state_data:
72 | return 0.0
73 | comp = (
74 | len(state_data) * 5 + _dict_depth(state_data) * 10 + _count_values(state_data) * 0.5
75 | )
76 | return round(min(100.0, comp), 2)
77 |
78 | def compute_state_diff(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
79 | diff = {"added": {}, "removed": {}, "modified": {}, "magnitude": 0.0}
80 | keys = set(a) | set(b)
81 | changed = 0
82 | for k in keys:
83 | if k not in a:
84 | diff["added"][k] = b[k]
85 | changed += 1
86 | elif k not in b:
87 | diff["removed"][k] = a[k]
88 | changed += 1
89 | elif a[k] != b[k]:
90 | diff["modified"][k] = {"before": a[k], "after": b[k]}
91 | changed += 1
92 | if keys:
93 | diff["magnitude"] = (changed / len(keys)) * 100
94 | return diff
95 |
96 | # ---------- Pydantic models ----------
97 | class CognitiveState(BaseModel):
98 | state_id: str
99 | timestamp: float
100 | formatted_timestamp: str
101 | state_type: str
102 | description: Optional[str] = None
103 | workflow_id: Optional[str] = None
104 | workflow_title: Optional[str] = None
105 | complexity_score: float
106 | change_magnitude: float
107 | age_minutes: float
108 | memory_count: int
109 | action_count: int
110 | state_data: Dict[str, Any] = {}
111 |
112 | class CognitiveStatesResponse(BaseModel):
113 | states: List[CognitiveState]
114 | total: int
115 | has_more: bool
116 |
117 | # ---------- Static assets ----------
118 | # ---------- Root Discovery Endpoint ----------
119 |
120 | @app.get(
121 | "/",
122 | summary="MCP Server Discovery",
123 | description="Returns information about the MCP server endpoint",
124 | response_description="Server information including transport type and endpoint path",
125 | )
126 | async def root_endpoint(): # noqa: D401
127 | """Root endpoint for MCP server discovery"""
128 | response_data = {
129 | "type": "mcp-server",
130 | "version": "1.0.0",
131 | "transport": "http",
132 | "endpoint": "/mcp",
133 | "api_docs": "/api/docs",
134 | "api_spec": "/api/openapi.json",
135 | }
136 | headers = {
137 | "X-MCP-Server": "true",
138 | "X-MCP-Version": "1.0.0",
139 | "X-MCP-Transport": "http",
140 | }
141 | return JSONResponse(content=response_data, headers=headers)
142 |
143 | @app.get("/tools/ums_explorer.html", include_in_schema=False)
144 | async def serve_ums_explorer():
145 | html_path = tools_dir / "ums_explorer.html"
146 | if html_path.exists():
147 | return FileResponse(str(html_path), media_type="text/html")
148 | return JSONResponse({"error": "UMS Explorer HTML file not found"}, status_code=404)
149 |
150 | @app.get("/storage/unified_agent_memory.db", include_in_schema=False)
151 | async def serve_database():
152 | db_path = storage_dir / "unified_agent_memory.db"
153 | if db_path.exists():
154 | return FileResponse(
155 | str(db_path),
156 | media_type="application/x-sqlite3",
157 | filename="unified_agent_memory.db",
158 | )
159 | return JSONResponse({"error": "Database file not found"}, status_code=404)
160 |
161 | @app.get("/ums-explorer", include_in_schema=False)
162 | async def ums_explorer_redirect():
163 | return RedirectResponse(url="/api/tools/ums_explorer.html")
164 |
165 | # ---------- Cognitive-states endpoint ----------
166 | @app.get(
167 | "/cognitive-states", response_model=CognitiveStatesResponse, tags=["Cognitive States"]
168 | )
169 | async def get_cognitive_states(
170 | start_time: Optional[float] = Query(None, ge=0),
171 | end_time: Optional[float] = Query(None, ge=0),
172 | limit: int = Query(100, ge=1, le=1000),
173 | offset: int = Query(0, ge=0),
174 | pattern_type: Optional[str] = Query(None, regex="^[A-Za-z_]+$"),
175 | ) -> CognitiveStatesResponse:
176 | try:
177 | conn = get_db_connection()
178 | cur = conn.cursor()
179 | sql = (
180 | "SELECT cs.*, w.title AS workflow_title, "
181 | "COUNT(DISTINCT m.memory_id) AS memory_count, "
182 | "COUNT(DISTINCT a.action_id) AS action_count "
183 | "FROM cognitive_timeline_states cs "
184 | "LEFT JOIN workflows w ON cs.workflow_id = w.workflow_id "
185 | "LEFT JOIN memories m ON cs.workflow_id = m.workflow_id "
186 | "LEFT JOIN actions a ON cs.workflow_id = a.workflow_id "
187 | "WHERE 1=1"
188 | )
189 | params: List[Any] = []
190 | if start_time:
191 | sql += " AND cs.timestamp >= ?"
192 | params.append(start_time)
193 | if end_time:
194 | sql += " AND cs.timestamp <= ?"
195 | params.append(end_time)
196 | if pattern_type:
197 | sql += " AND cs.state_type = ?"
198 | params.append(pattern_type)
199 | sql += " GROUP BY cs.state_id ORDER BY cs.timestamp DESC LIMIT ? OFFSET ?"
200 | params.extend([limit, offset])
201 | cur.execute(sql, params)
202 | cols = [d[0] for d in cur.description]
203 | rows = [dict(zip(cols, r, strict=False)) for r in cur.fetchall()]
204 | states: List[CognitiveState] = []
205 | for r in rows:
206 | try:
207 | data = json.loads(r.get("state_data", "{}"))
208 | except Exception:
209 | data = {}
210 | states.append(
211 | CognitiveState(
212 | state_id=r["state_id"],
213 | timestamp=r["timestamp"],
214 | formatted_timestamp=datetime.fromtimestamp(r["timestamp"]).isoformat(),
215 | state_type=r.get("state_type", "unknown"),
216 | description=r.get("description"),
217 | workflow_id=r.get("workflow_id"),
218 | workflow_title=r.get("workflow_title"),
219 | complexity_score=calculate_state_complexity(data),
220 | change_magnitude=0.0,
221 | age_minutes=(datetime.now().timestamp() - r["timestamp"]) / 60,
222 | memory_count=r.get("memory_count", 0),
223 | action_count=r.get("action_count", 0),
224 | state_data=data,
225 | )
226 | )
227 | for i in range(len(states) - 1):
228 | diff = compute_state_diff(states[i + 1].state_data, states[i].state_data)
229 | states[i].change_magnitude = diff["magnitude"]
230 | conn.close()
231 | return CognitiveStatesResponse(
232 | states=states, total=len(states), has_more=len(states) == limit
233 | )
234 | except sqlite3.Error as e:
235 | raise HTTPException(status_code=500, detail=f"Database error: {e}") from e
236 | except Exception as e:
237 | raise HTTPException(status_code=500, detail=f"Internal error: {e}") from e
238 |
239 | # ---------- Timeline helper functions ----------
240 | def generate_timeline_segments(
241 | timeline_data: List[Dict[str, Any]], granularity: str, hours: int
242 | ) -> List[Dict[str, Any]]:
243 | """Generate timeline segments summarising state counts / complexity over time."""
244 | if not timeline_data:
245 | return []
246 |
247 | start_ts = min(item["timestamp"] for item in timeline_data)
248 | end_ts = max(item["timestamp"] for item in timeline_data)
249 |
250 | seg_seconds = 1 if granularity == "second" else 60 if granularity == "minute" else 3600
251 | segments: List[Dict[str, Any]] = []
252 | current = start_ts
253 | from collections import Counter
254 |
255 | while current < end_ts:
256 | seg_end = current + seg_seconds
257 | seg_states = [it for it in timeline_data if current <= it["timestamp"] < seg_end]
258 | if seg_states:
259 | segments.append(
260 | {
261 | "start_time": current,
262 | "end_time": seg_end,
263 | "state_count": len(seg_states),
264 | "avg_complexity": sum(s["complexity_score"] for s in seg_states)
265 | / len(seg_states),
266 | "max_change_magnitude": max(s["change_magnitude"] for s in seg_states),
267 | "dominant_type": Counter(
268 | s["state_type"] for s in seg_states
269 | ).most_common(1)[0][0],
270 | }
271 | )
272 | current = seg_end
273 | return segments
274 |
275 | def calculate_timeline_stats(timeline_data: List[Dict[str, Any]]) -> Dict[str, Any]:
276 | """Return aggregate stats about timeline complexity / changes."""
277 | if not timeline_data:
278 | return {}
279 | from collections import Counter
280 |
281 | complexities = [it["complexity_score"] for it in timeline_data]
282 | changes = [it["change_magnitude"] for it in timeline_data if it["change_magnitude"] > 0]
283 | stypes = Counter(it["state_type"] for it in timeline_data)
284 | return {
285 | "avg_complexity": sum(complexities) / len(complexities),
286 | "max_complexity": max(complexities),
287 | "avg_change_magnitude": (sum(changes) / len(changes)) if changes else 0,
288 | "max_change_magnitude": max(changes) if changes else 0,
289 | "most_common_type": stypes.most_common(1)[0][0] if stypes else None,
290 | "type_distribution": dict(stypes),
291 | }
292 |
293 | # ---------- Timeline Pydantic models ----------
294 | class TimelineState(BaseModel):
295 | state_id: str
296 | timestamp: float
297 | formatted_time: str
298 | state_type: str
299 | workflow_id: Optional[str] = None
300 | description: Optional[str] = None
301 | sequence_number: int
302 | complexity_score: float
303 | change_magnitude: float
304 |
305 | class TimelineSummaryStats(BaseModel):
306 | avg_complexity: float
307 | total_transitions: int
308 | max_change_magnitude: float
309 |
310 | class CognitiveTimelineResponse(BaseModel):
311 | timeline_data: List[TimelineState]
312 | total_states: int
313 | time_range_hours: int
314 | granularity: str
315 | summary_stats: TimelineSummaryStats
316 |
317 | # ---------- Timeline endpoint ----------
318 | @app.get(
319 | "/cognitive-states/timeline",
320 | response_model=CognitiveTimelineResponse,
321 | tags=["Cognitive States"],
322 | summary="Get cognitive state timeline for visualization",
323 | )
324 | async def get_cognitive_timeline(
325 | hours: int = Query(24, ge=1, le=168),
326 | granularity: str = Query("hour", regex="^(second|minute|hour)$"),
327 | ) -> CognitiveTimelineResponse:
328 | try:
329 | conn = get_db_connection()
330 | cur = conn.cursor()
331 | since_ts = datetime.now().timestamp() - hours * 3600
332 | cur.execute(
333 | """
334 | SELECT state_id, timestamp, state_type, state_data, workflow_id, description,
335 | ROW_NUMBER() OVER (ORDER BY timestamp) AS seq
336 | FROM cognitive_timeline_states WHERE timestamp >= ? ORDER BY timestamp ASC
337 | """,
338 | (since_ts,),
339 | )
340 | cols = [d[0] for d in cur.description]
341 | rows = [dict(zip(cols, r, strict=False)) for r in cur.fetchall()]
342 |
343 | timeline: List[TimelineState] = []
344 | for idx, r in enumerate(rows):
345 | try:
346 | data = json.loads(r.get("state_data", "{}"))
347 | except Exception:
348 | data = {}
349 | change = 0.0
350 | if idx > 0:
351 | try:
352 | prev_data = json.loads(rows[idx - 1].get("state_data", "{}"))
353 | except Exception:
354 | prev_data = {}
355 | change = compute_state_diff(prev_data, data)["magnitude"]
356 | timeline.append(
357 | TimelineState(
358 | state_id=r["state_id"],
359 | timestamp=r["timestamp"],
360 | formatted_time=datetime.fromtimestamp(r["timestamp"]).isoformat(),
361 | state_type=r["state_type"],
362 | workflow_id=r.get("workflow_id"),
363 | description=r.get("description"),
364 | sequence_number=r["seq"],
365 | complexity_score=calculate_state_complexity(data),
366 | change_magnitude=change,
367 | )
368 | )
369 |
370 | stats = TimelineSummaryStats(
371 | avg_complexity=sum(t.complexity_score for t in timeline) / len(timeline)
372 | if timeline
373 | else 0,
374 | total_transitions=len(timeline) - 1 if len(timeline) > 1 else 0,
375 | max_change_magnitude=max((t.change_magnitude for t in timeline), default=0),
376 | )
377 | conn.close()
378 | return CognitiveTimelineResponse(
379 | timeline_data=timeline,
380 | total_states=len(timeline),
381 | time_range_hours=hours,
382 | granularity=granularity,
383 | summary_stats=stats,
384 | )
385 | except sqlite3.Error as e:
386 | raise HTTPException(status_code=500, detail=f"Database error: {e}") from e
387 | except Exception as e:
388 | raise HTTPException(status_code=500, detail=f"Internal error: {e}") from e
389 |
390 | # ---------- Detailed state models ----------
391 | class Memory(BaseModel):
392 | memory_id: str
393 | memory_type: str
394 | content: str
395 | importance: float
396 | created_at: float
397 |
398 | class Action(BaseModel):
399 | action_id: str
400 | action_type: str
401 | tool_name: str
402 | status: str
403 | started_at: float
404 |
405 | class DetailedCognitiveState(BaseModel):
406 | state_id: str
407 | timestamp: float
408 | formatted_timestamp: str
409 | state_type: str
410 | description: Optional[str] = None
411 | workflow_id: Optional[str] = None
412 | workflow_title: Optional[str] = None
413 | workflow_goal: Optional[str] = None
414 | state_data: Dict[str, Any]
415 | complexity_score: float
416 | memories: List[Memory] = []
417 | actions: List[Action] = []
418 |
419 | # ---------- Detailed state endpoint ----------
420 | @app.get(
421 | "/cognitive-states/{state_id}",
422 | response_model=DetailedCognitiveState,
423 | tags=["Cognitive States"],
424 | summary="Get detailed cognitive state information",
425 | )
426 | async def get_cognitive_state_detail(
427 | state_id: str = ApiPath(..., regex="^[A-Za-z0-9_-]+$"),
428 | ) -> DetailedCognitiveState:
429 | try:
430 | conn = get_db_connection()
431 | cur = conn.cursor()
432 | cur.execute(
433 | """
434 | SELECT cs.*, w.title AS workflow_title, w.goal AS workflow_goal
435 | FROM cognitive_timeline_states cs LEFT JOIN workflows w ON cs.workflow_id = w.workflow_id
436 | WHERE cs.state_id = ?
437 | """,
438 | (state_id,),
439 | )
440 | row = cur.fetchone()
441 | if not row:
442 | raise HTTPException(
443 | status_code=404, detail=f"Cognitive state '{state_id}' not found"
444 | )
445 | cols = [d[0] for d in cur.description]
446 | state = dict(zip(cols, row, strict=False))
447 | try:
448 | data = json.loads(state.get("state_data", "{}"))
449 | except Exception:
450 | data = {}
451 |
452 | # memories
453 | cur.execute(
454 | "SELECT memory_id, memory_type, content, importance, created_at FROM memories WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 20",
455 | (state.get("workflow_id"),),
456 | )
457 | mem_cols = [d[0] for d in cur.description]
458 | memories = [Memory(**dict(zip(mem_cols, m, strict=False))) for m in cur.fetchall()]
459 |
460 | # actions
461 | cur.execute(
462 | "SELECT action_id, action_type, tool_name, status, started_at FROM actions WHERE workflow_id = ? ORDER BY started_at DESC LIMIT 20",
463 | (state.get("workflow_id"),),
464 | )
465 | act_cols = [d[0] for d in cur.description]
466 | actions = [Action(**dict(zip(act_cols, a, strict=False))) for a in cur.fetchall()]
467 | conn.close()
468 | return DetailedCognitiveState(
469 | state_id=state["state_id"],
470 | timestamp=state["timestamp"],
471 | formatted_timestamp=datetime.fromtimestamp(state["timestamp"]).isoformat(),
472 | state_type=state.get("state_type", "unknown"),
473 | description=state.get("description"),
474 | workflow_id=state.get("workflow_id"),
475 | workflow_title=state.get("workflow_title"),
476 | workflow_goal=state.get("workflow_goal"),
477 | state_data=data,
478 | complexity_score=calculate_state_complexity(data),
479 | memories=memories,
480 | actions=actions,
481 | )
482 | except HTTPException:
483 | raise
484 | except sqlite3.Error as e:
485 | raise HTTPException(status_code=500, detail=f"Database error: {e}") from e
486 | except Exception as e:
487 | raise HTTPException(status_code=500, detail=f"Internal error: {e}") from e
488 |
489 | # ---------- Pattern analysis helpers ----------
490 | def find_cognitive_patterns(
491 | states: List[Dict[str, Any]], min_length: int, similarity_threshold: float
492 | ) -> List[Dict[str, Any]]:
493 | """Find recurring patterns in cognitive states"""
494 | patterns = []
495 | from collections import defaultdict
496 |
497 | type_sequences = defaultdict(list)
498 | for state in states:
499 | type_sequences[state["state_type"]].append(state)
500 | for state_type, sequence in type_sequences.items():
501 | if len(sequence) >= min_length * 2:
502 | for length in range(min_length, len(sequence) // 2 + 1):
503 | for start in range(len(sequence) - length * 2 + 1):
504 | subseq1 = sequence[start : start + length]
505 | subseq2 = sequence[start + length : start + length * 2]
506 | similarity = calculate_sequence_similarity(subseq1, subseq2)
507 | if similarity >= similarity_threshold:
508 | patterns.append(
509 | {
510 | "type": f"repeating_{state_type}",
511 | "length": length,
512 | "similarity": similarity,
513 | "occurrences": 2,
514 | "first_occurrence": subseq1[0]["timestamp"],
515 | "pattern_description": f"Repeating {state_type} sequence of {length} states",
516 | }
517 | )
518 | return sorted(patterns, key=lambda p: p["similarity"], reverse=True)
519 |
520 | def calculate_sequence_similarity(
521 | seq1: List[Dict[str, Any]], seq2: List[Dict[str, Any]]
522 | ) -> float:
523 | """Calculate similarity between two state sequences"""
524 | if len(seq1) != len(seq2):
525 | return 0.0
526 | total_similarity = 0.0
527 | for s1, s2 in zip(seq1, seq2, strict=False):
528 | state_sim = calculate_single_state_similarity(s1, s2)
529 | total_similarity += state_sim
530 | return total_similarity / len(seq1)
531 |
532 | def calculate_single_state_similarity(
533 | state1: Dict[str, Any], state2: Dict[str, Any]
534 | ) -> float:
535 | """Calculate similarity between two individual states"""
536 | data1 = state1.get("state_data", {})
537 | data2 = state2.get("state_data", {})
538 | if not data1 and not data2:
539 | return 1.0
540 | if not data1 or not data2:
541 | return 0.0
542 | keys1 = set(data1.keys())
543 | keys2 = set(data2.keys())
544 | key_similarity = len(keys1 & keys2) / len(keys1 | keys2) if keys1 | keys2 else 1.0
545 | common_keys = keys1 & keys2
546 | value_similarity = 0.0
547 | if common_keys:
548 | matching_values = sum(1 for key in common_keys if data1[key] == data2[key])
549 | value_similarity = matching_values / len(common_keys)
550 | return (key_similarity + value_similarity) / 2
551 |
552 | def analyze_state_transitions(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
553 | """Analyze transitions between cognitive states"""
554 | from collections import defaultdict
555 |
556 | transitions = defaultdict(int)
557 | for i in range(len(states) - 1):
558 | current_type = states[i]["state_type"]
559 | next_type = states[i + 1]["state_type"]
560 | transition = f"{current_type} → {next_type}"
561 | transitions[transition] += 1
562 | sorted_transitions = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
563 | return [
564 | {
565 | "transition": transition,
566 | "count": count,
567 | "percentage": (count / (len(states) - 1)) * 100 if len(states) > 1 else 0,
568 | }
569 | for transition, count in sorted_transitions
570 | ]
571 |
572 | def detect_cognitive_anomalies(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
573 | """Detect anomalous cognitive states"""
574 | anomalies = []
575 | if len(states) < 3:
576 | return anomalies
577 | complexities = [calculate_state_complexity(s.get("state_data", {})) for s in states]
578 | avg_complexity = sum(complexities) / len(complexities)
579 | std_complexity = (
580 | sum((c - avg_complexity) ** 2 for c in complexities) / len(complexities)
581 | ) ** 0.5
582 | for i, state in enumerate(states):
583 | complexity = complexities[i]
584 | z_score = (
585 | (complexity - avg_complexity) / std_complexity if std_complexity > 0 else 0
586 | )
587 | if abs(z_score) > 2:
588 | anomalies.append(
589 | {
590 | "state_id": state["state_id"],
591 | "timestamp": state["timestamp"],
592 | "anomaly_type": "complexity_outlier",
593 | "z_score": z_score,
594 | "description": f"Unusual complexity: {complexity:.1f} (avg: {avg_complexity:.1f})",
595 | "severity": "high" if abs(z_score) > 3 else "medium",
596 | }
597 | )
598 | return anomalies
599 |
600 | # ---------- Pattern analysis models ----------
601 | class Pattern(BaseModel):
602 | type: str
603 | length: int
604 | similarity: float
605 | occurrences: int
606 | first_occurrence: float
607 | pattern_description: str
608 |
609 | class Transition(BaseModel):
610 | transition: str
611 | count: int
612 | percentage: float
613 |
614 | class Anomaly(BaseModel):
615 | state_id: str
616 | timestamp: float
617 | anomaly_type: str
618 | z_score: float
619 | description: str
620 | severity: str
621 |
622 | class PatternSummary(BaseModel):
623 | pattern_count: int
624 | most_common_transition: Optional[Transition] = None
625 | anomaly_count: int
626 |
627 | class CognitivePatternAnalysis(BaseModel):
628 | total_states: int
629 | time_range_hours: int
630 | patterns: List[Pattern] = []
631 | transitions: List[Transition] = []
632 | anomalies: List[Anomaly] = []
633 | summary: PatternSummary
634 |
635 | # ---------- Pattern analysis endpoint ----------
636 | @app.get(
637 | "/cognitive-states/patterns",
638 | response_model=CognitivePatternAnalysis,
639 | tags=["Cognitive States"],
640 | )
641 | async def analyze_cognitive_patterns(
642 | lookback_hours: int = Query(24, ge=1, le=720),
643 | min_pattern_length: int = Query(3, ge=2, le=20),
644 | similarity_threshold: float = Query(0.7, ge=0.1, le=1.0),
645 | ) -> CognitivePatternAnalysis:
646 | try:
647 | conn = get_db_connection()
648 | cur = conn.cursor()
649 | since_ts = datetime.now().timestamp() - lookback_hours * 3600
650 | cur.execute(
651 | "SELECT state_id, timestamp, state_type, state_data, workflow_id FROM cognitive_timeline_states WHERE timestamp >= ? ORDER BY timestamp ASC",
652 | (since_ts,),
653 | )
654 | states = [
655 | dict(zip([d[0] for d in cur.description], row, strict=False))
656 | for row in cur.fetchall()
657 | ]
658 | for state in states:
659 | try:
660 | state["state_data"] = json.loads(state.get("state_data", "{}"))
661 | except Exception:
662 | state["state_data"] = {}
663 | patterns = find_cognitive_patterns(states, min_pattern_length, similarity_threshold)
664 | transitions = analyze_state_transitions(states)
665 | anomalies = detect_cognitive_anomalies(states)
666 | conn.close()
667 | summary = PatternSummary(
668 | pattern_count=len(patterns),
669 | most_common_transition=Transition(**transitions[0]) if transitions else None,
670 | anomaly_count=len(anomalies),
671 | )
672 | return CognitivePatternAnalysis(
673 | total_states=len(states),
674 | time_range_hours=lookback_hours,
675 | patterns=[Pattern(**p) for p in patterns],
676 | transitions=[Transition(**t) for t in transitions],
677 | anomalies=[Anomaly(**a) for a in anomalies],
678 | summary=summary,
679 | )
680 | except sqlite3.Error as e:
681 | raise HTTPException(status_code=500, detail=f"Database error: {e}") from e
682 | except Exception as e:
683 | raise HTTPException(status_code=500, detail=f"Internal error: {e}") from e
684 |
685 | # ---------- State comparison models ----------
686 | class StateComparisonInfo(BaseModel):
687 | state_id: str
688 | timestamp: float
689 | formatted_timestamp: str
690 |
691 | class StateDiff(BaseModel):
692 | added: Dict[str, Any] = {}
693 | removed: Dict[str, Any] = {}
694 | modified: Dict[str, Dict[str, Any]] = {}
695 | magnitude: float
696 |
697 | class StateComparisonRequest(BaseModel):
698 | state_id_1: str = Field(
699 | ...,
700 | description="First cognitive state ID for comparison",
701 | example="state_abc123"
702 | )
703 | state_id_2: str = Field(
704 | ...,
705 | description="Second cognitive state ID for comparison",
706 | example="state_xyz789"
707 | )
708 |
709 | class StateComparisonResponse(BaseModel):
710 | state_1: StateComparisonInfo
711 | state_2: StateComparisonInfo
712 | time_diff_minutes: float
713 | diff: StateDiff
714 |
715 | @app.post(
716 | "/cognitive-states/compare",
717 | response_model=StateComparisonResponse,
718 | tags=["Cognitive States"],
719 | summary="Compare two cognitive states",
720 | description="""
721 | Perform detailed comparison between two cognitive states to understand:
722 |
723 | - **Structural differences** in state data
724 | - **Added, removed, and modified** components
725 | - **Change magnitude** calculation
726 | - **Time differential** between states
727 |
728 | Perfect for understanding how cognitive states evolve and what changes between specific points in time.
729 | """,
730 | responses={
731 | 200: {
732 | "description": "Detailed comparison results",
733 | "content": {
734 | "application/json": {
735 | "example": {
736 | "state_1": {
737 | "state_id": "state_abc123",
738 | "timestamp": 1703980800.0,
739 | "formatted_timestamp": "2024-01-01T00:00:00"
740 | },
741 | "state_2": {
742 | "state_id": "state_xyz789",
743 | "timestamp": 1703984400.0,
744 | "formatted_timestamp": "2024-01-01T01:00:00"
745 | },
746 | "time_diff_minutes": 60.0,
747 | "diff": {
748 | "added": {
749 | "new_insight": "PDF contains financial data",
750 | "confidence": 0.95
751 | },
752 | "removed": {
753 | "initial_assumption": "Document is text-only"
754 | },
755 | "modified": {
756 | "tool_preference": {
757 | "before": "file_reader",
758 | "after": "smart_browser"
759 | }
760 | },
761 | "magnitude": 45.5
762 | }
763 | }
764 | }
765 | }
766 | },
767 | 400: {
768 | "description": "Invalid request - both state IDs required",
769 | "content": {
770 | "application/json": {
771 | "example": {"detail": "Both state_id_1 and state_id_2 are required"}
772 | }
773 | }
774 | },
775 | 404: {
776 | "description": "One or both states not found",
777 | "content": {
778 | "application/json": {
779 | "example": {"detail": "State with ID 'state_abc123' not found"}
780 | }
781 | }
782 | },
783 | 500: {"description": "Internal server error"}
784 | }
785 | )
786 | async def compare_cognitive_states(
787 | request: StateComparisonRequest,
788 | ) -> StateComparisonResponse:
789 | try:
790 | if not request.state_id_1 or not request.state_id_2:
791 | raise HTTPException(
792 | status_code=400, detail="Both state_id_1 and state_id_2 are required"
793 | )
794 | conn = get_db_connection()
795 | cur = conn.cursor()
796 | cur.execute(
797 | "SELECT * FROM cognitive_timeline_states WHERE state_id IN (?, ?)",
798 | (request.state_id_1, request.state_id_2),
799 | )
800 | states = [
801 | dict(zip([d[0] for d in cur.description], row, strict=False))
802 | for row in cur.fetchall()
803 | ]
804 | if len(states) != 2:
805 | missing_ids = []
806 | found_ids = [s["state_id"] for s in states]
807 | if request.state_id_1 not in found_ids:
808 | missing_ids.append(request.state_id_1)
809 | if request.state_id_2 not in found_ids:
810 | missing_ids.append(request.state_id_2)
811 | raise HTTPException(
812 | status_code=404, detail=f"State(s) not found: {', '.join(missing_ids)}"
813 | )
814 | for state in states:
815 | try:
816 | state["state_data"] = json.loads(state.get("state_data", "{}"))
817 | except Exception:
818 | state["state_data"] = {}
819 | states.sort(key=lambda s: s["timestamp"])
820 | state_1, state_2 = states
821 | if state_1["state_id"] != request.state_id_1:
822 | state_1, state_2 = state_2, state_1
823 | diff_result = compute_state_diff(
824 | state_1.get("state_data", {}), state_2.get("state_data", {})
825 | )
826 | conn.close()
827 | time_diff_minutes = abs(state_2["timestamp"] - state_1["timestamp"]) / 60
828 | return StateComparisonResponse(
829 | state_1=StateComparisonInfo(
830 | state_id=state_1["state_id"],
831 | timestamp=state_1["timestamp"],
832 | formatted_timestamp=datetime.fromtimestamp(
833 | state_1["timestamp"]
834 | ).isoformat(),
835 | ),
836 | state_2=StateComparisonInfo(
837 | state_id=state_2["state_id"],
838 | timestamp=state_2["timestamp"],
839 | formatted_timestamp=datetime.fromtimestamp(
840 | state_2["timestamp"]
841 | ).isoformat(),
842 | ),
843 | time_diff_minutes=time_diff_minutes,
844 | diff=StateDiff(**diff_result),
845 | )
846 | except HTTPException:
847 | raise
848 | except sqlite3.Error as e:
849 | raise HTTPException(status_code=500, detail=f"Database error: {e}") from e
850 | except Exception as e:
851 | raise HTTPException(status_code=500, detail=f"Internal error: {e}") from e
852 |
853 | # ---------- Action Monitor Helper Functions ----------
854 |
855 | def get_action_status_indicator(status: str, execution_time: float) -> dict:
856 | """Get status indicator with color and icon for action status"""
857 | indicators = {
858 | "running": {"color": "blue", "icon": "play", "label": "Running"},
859 | "executing": {"color": "blue", "icon": "cpu", "label": "Executing"},
860 | "in_progress": {"color": "orange", "icon": "clock", "label": "In Progress"},
861 | "completed": {"color": "green", "icon": "check", "label": "Completed"},
862 | "failed": {"color": "red", "icon": "x", "label": "Failed"},
863 | "cancelled": {"color": "gray", "icon": "stop", "label": "Cancelled"},
864 | "timeout": {"color": "yellow", "icon": "timer-off", "label": "Timeout"},
865 | }
866 |
867 | indicator = indicators.get(
868 | status, {"color": "gray", "icon": "help", "label": "Unknown"}
869 | )
870 |
871 | # Add urgency flag for long-running actions
872 | if (
873 | status in ["running", "executing", "in_progress"] and execution_time > 120
874 | ): # 2 minutes
875 | indicator["urgency"] = "high"
876 | elif (
877 | status in ["running", "executing", "in_progress"] and execution_time > 60
878 | ): # 1 minute
879 | indicator["urgency"] = "medium"
880 | else:
881 | indicator["urgency"] = "low"
882 |
883 | return indicator
884 |
885 | def categorize_action_performance(execution_time: float, estimated_duration: float) -> str:
886 | """Categorize action performance based on execution time vs estimate"""
887 | if estimated_duration <= 0:
888 | return "unknown"
889 |
890 | ratio = execution_time / estimated_duration
891 |
892 | if ratio <= 0.5:
893 | return "excellent"
894 | elif ratio <= 0.8:
895 | return "good"
896 | elif ratio <= 1.2:
897 | return "acceptable"
898 | elif ratio <= 2.0:
899 | return "slow"
900 | else:
901 | return "very_slow"
902 |
903 | def get_action_resource_usage(action_id: str) -> dict:
904 | """Get resource usage for an action (placeholder implementation)"""
905 | # This is a placeholder - in a real implementation, you'd fetch actual metrics
906 | return {"cpu_usage": 0.0, "memory_usage": 0.0, "network_io": 0.0, "disk_io": 0.0}
907 |
908 | def estimate_wait_time(position: int, queue: list) -> float:
909 | """Estimate wait time based on queue position and historical data"""
910 | if position == 0:
911 | return 0.0
912 | # Average action time of 30 seconds (this could be calculated from historical data)
913 | avg_action_time = 30.0
914 | return position * avg_action_time
915 |
916 | def get_priority_label(priority: int) -> str:
917 | """Get human-readable priority label"""
918 | if priority <= 1:
919 | return "Critical"
920 | elif priority <= 3:
921 | return "High"
922 | elif priority <= 5:
923 | return "Normal"
924 | elif priority <= 7:
925 | return "Low"
926 | else:
927 | return "Very Low"
928 |
929 | def calculate_action_performance_score(action: dict) -> float:
930 | """Calculate performance score for a completed action"""
931 | if action["status"] != "completed":
932 | return 0.0
933 |
934 | execution_time = action.get("execution_duration", 0)
935 | if execution_time <= 0:
936 | return 100.0
937 |
938 | if execution_time <= 5:
939 | return 100.0
940 | elif execution_time <= 15:
941 | return 90.0
942 | elif execution_time <= 30:
943 | return 80.0
944 | elif execution_time <= 60:
945 | return 70.0
946 | elif execution_time <= 120:
947 | return 60.0
948 | else:
949 | return max(50.0, 100.0 - (execution_time / 10))
950 |
951 | def calculate_efficiency_rating(execution_time: float, result_size: int) -> str:
952 | """Calculate efficiency rating based on time and output"""
953 | if execution_time <= 0:
954 | return "unknown"
955 |
956 | efficiency_score = result_size / execution_time if execution_time > 0 else 0
957 |
958 | if efficiency_score >= 100:
959 | return "excellent"
960 | elif efficiency_score >= 50:
961 | return "good"
962 | elif efficiency_score >= 20:
963 | return "fair"
964 | else:
965 | return "poor"
966 |
967 | def calculate_performance_summary(actions: list) -> dict:
968 | """Calculate performance summary from action history"""
969 | if not actions:
970 | return {
971 | "avg_score": 0.0,
972 | "top_performer": None,
973 | "worst_performer": None,
974 | "efficiency_distribution": {},
975 | }
976 |
977 | scores = [a.get("performance_score", 0) for a in actions]
978 | avg_score = sum(scores) / len(scores)
979 |
980 | best_action = max(actions, key=lambda a: a.get("performance_score", 0))
981 | worst_action = min(actions, key=lambda a: a.get("performance_score", 0))
982 |
983 | from collections import Counter
984 |
985 | efficiency_counts = Counter(a.get("efficiency_rating", "unknown") for a in actions)
986 |
987 | return {
988 | "avg_score": round(avg_score, 2),
989 | "top_performer": {
990 | "tool_name": best_action.get("tool_name", ""),
991 | "score": best_action.get("performance_score", 0),
992 | },
993 | "worst_performer": {
994 | "tool_name": worst_action.get("tool_name", ""),
995 | "score": worst_action.get("performance_score", 0),
996 | },
997 | "efficiency_distribution": dict(efficiency_counts),
998 | }
999 |
1000 | def generate_performance_insights(
1001 | overall_stats: dict, tool_stats: list, hourly_metrics: list
1002 | ) -> list:
1003 | """Generate actionable performance insights"""
1004 | insights = []
1005 |
1006 | success_rate = (
1007 | overall_stats.get("successful_actions", 0) / overall_stats.get("total_actions", 1)
1008 | ) * 100
1009 | if success_rate < 80:
1010 | insights.append(
1011 | {
1012 | "type": "warning",
1013 | "title": "Low Success Rate",
1014 | "message": f"Current success rate is {success_rate:.1f}%. Consider investigating failing tools.",
1015 | "severity": "high",
1016 | }
1017 | )
1018 |
1019 | if tool_stats:
1020 | slowest_tool = max(tool_stats, key=lambda t: t.get("avg_duration", 0))
1021 | if slowest_tool.get("avg_duration", 0) > 60:
1022 | insights.append(
1023 | {
1024 | "type": "info",
1025 | "title": "Performance Optimization",
1026 | "message": f"{slowest_tool['tool_name']} is taking {slowest_tool['avg_duration']:.1f}s on average. Consider optimization.",
1027 | "severity": "medium",
1028 | }
1029 | )
1030 |
1031 | if hourly_metrics:
1032 | peak_hour = max(hourly_metrics, key=lambda h: h.get("action_count", 0))
1033 | insights.append(
1034 | {
1035 | "type": "info",
1036 | "title": "Peak Usage",
1037 | "message": f"Peak usage occurs at {peak_hour['hour']}:00 with {peak_hour['action_count']} actions.",
1038 | "severity": "low",
1039 | }
1040 | )
1041 |
1042 | return insights
1043 |
1044 | # ---------- Action Monitor Pydantic Models ----------
1045 |
1046 | class StatusIndicator(BaseModel):
1047 | """Action status indicator with visual cues"""
1048 |
1049 | color: str = Field(..., description="Color for visual representation")
1050 | icon: str = Field(..., description="Icon name for the status")
1051 | label: str = Field(..., description="Human-readable status label")
1052 | urgency: str = Field(..., description="Urgency level: low, medium, high")
1053 |
1054 | class ResourceUsage(BaseModel):
1055 | """Resource usage metrics for an action"""
1056 |
1057 | cpu_usage: float = Field(..., description="CPU usage percentage")
1058 | memory_usage: float = Field(..., description="Memory usage percentage")
1059 | network_io: float = Field(..., description="Network I/O in KB/s")
1060 | disk_io: float = Field(..., description="Disk I/O in KB/s")
1061 |
1062 | class RunningAction(BaseModel):
1063 | """Model for a currently running action"""
1064 |
1065 | action_id: str = Field(..., description="Unique action identifier")
1066 | workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
1067 | workflow_title: Optional[str] = Field(None, description="Workflow title")
1068 | tool_name: str = Field(..., description="Name of the tool being executed")
1069 | status: str = Field(..., description="Current execution status")
1070 | started_at: float = Field(..., description="Start timestamp")
1071 | formatted_start_time: str = Field(..., description="ISO formatted start time")
1072 | execution_time_seconds: float = Field(
1073 | ..., description="Current execution duration in seconds"
1074 | )
1075 | estimated_duration: Optional[float] = Field(
1076 | None, description="Estimated duration in seconds"
1077 | )
1078 | progress_percentage: float = Field(..., description="Estimated progress percentage")
1079 | status_indicator: StatusIndicator = Field(..., description="Visual status indicator")
1080 | performance_category: str = Field(..., description="Performance categorization")
1081 | resource_usage: ResourceUsage = Field(..., description="Current resource usage")
1082 | tool_data: Dict[str, Any] = Field(
1083 | default_factory=dict, description="Tool-specific data"
1084 | )
1085 |
1086 | class RunningActionsResponse(BaseModel):
1087 | """Response for currently running actions"""
1088 |
1089 | running_actions: List[RunningAction] = Field(
1090 | ..., description="List of currently executing actions"
1091 | )
1092 | total_running: int = Field(..., description="Total number of running actions")
1093 | avg_execution_time: float = Field(
1094 | ..., description="Average execution time of running actions"
1095 | )
1096 | timestamp: str = Field(..., description="Response timestamp")
1097 |
1098 | class QueuedAction(BaseModel):
1099 | """Model for a queued action"""
1100 |
1101 | action_id: str = Field(..., description="Unique action identifier")
1102 | workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
1103 | workflow_title: Optional[str] = Field(None, description="Workflow title")
1104 | tool_name: str = Field(..., description="Name of the tool to be executed")
1105 | status: str = Field(..., description="Queue status")
1106 | created_at: float = Field(..., description="Creation timestamp")
1107 | formatted_queue_time: str = Field(..., description="ISO formatted queue time")
1108 | queue_position: int = Field(..., description="Position in the queue (1-based)")
1109 | queue_time_seconds: float = Field(..., description="Time spent in queue")
1110 | estimated_wait_time: float = Field(..., description="Estimated wait time in seconds")
1111 | priority: int = Field(..., description="Numeric priority value")
1112 | priority_label: str = Field(..., description="Human-readable priority label")
1113 | tool_data: Dict[str, Any] = Field(
1114 | default_factory=dict, description="Tool-specific data"
1115 | )
1116 |
1117 | class ActionQueueResponse(BaseModel):
1118 | """Response for action queue status"""
1119 |
1120 | queued_actions: List[QueuedAction] = Field(..., description="List of queued actions")
1121 | total_queued: int = Field(..., description="Total number of queued actions")
1122 | avg_queue_time: float = Field(..., description="Average time in queue")
1123 | next_action: Optional[QueuedAction] = Field(
1124 | None, description="Next action to be executed"
1125 | )
1126 | timestamp: str = Field(..., description="Response timestamp")
1127 |
1128 | # ---------- Action Monitor Endpoints ----------
1129 |
1130 | @app.get(
1131 | "/actions/running",
1132 | response_model=RunningActionsResponse,
1133 | tags=["Action Monitor"],
1134 | summary="Get currently executing actions",
1135 | description="""
1136 | Monitor actions that are currently executing with real-time status information:
1137 |
1138 | - **Execution progress** with percentage completion estimates
1139 | - **Performance categorization** (excellent, good, slow, etc.)
1140 | - **Resource usage indicators** (placeholder for future implementation)
1141 | - **Status indicators** with urgency levels
1142 | - **Estimated duration** vs actual execution time
1143 |
1144 | Ideal for monitoring system activity and identifying long-running or problematic actions.
1145 | """,
1146 | responses={
1147 | 200: {
1148 | "description": "List of currently running actions with real-time metrics",
1149 | "content": {
1150 | "application/json": {
1151 | "example": {
1152 | "running_actions": [
1153 | {
1154 | "action_id": "act_123",
1155 | "workflow_id": "wf_456",
1156 | "workflow_title": "Document Analysis",
1157 | "tool_name": "smart_browser",
1158 | "status": "running",
1159 | "started_at": 1703980800.0,
1160 | "formatted_start_time": "2024-01-01T00:00:00",
1161 | "execution_time_seconds": 45.5,
1162 | "estimated_duration": 60.0,
1163 | "progress_percentage": 75.8,
1164 | "status_indicator": {
1165 | "color": "blue",
1166 | "icon": "play",
1167 | "label": "Running",
1168 | "urgency": "low",
1169 | },
1170 | "performance_category": "good",
1171 | "resource_usage": {
1172 | "cpu_usage": 25.5,
1173 | "memory_usage": 512.0,
1174 | "network_io": 10.5,
1175 | "disk_io": 5.2,
1176 | },
1177 | "tool_data": {
1178 | "url": "https://example.com",
1179 | "action_type": "download",
1180 | },
1181 | }
1182 | ],
1183 | "total_running": 1,
1184 | "avg_execution_time": 45.5,
1185 | "timestamp": "2024-01-01T00:00:45.500000",
1186 | }
1187 | }
1188 | },
1189 | },
1190 | 500: {"description": "Internal server error"},
1191 | },
1192 | )
1193 | async def get_running_actions() -> RunningActionsResponse:
1194 | """Get currently executing actions with real-time status"""
1195 | try:
1196 | conn = get_db_connection()
1197 | cursor = conn.cursor()
1198 |
1199 | cursor.execute("""
1200 | SELECT
1201 | a.*,
1202 | w.title as workflow_title,
1203 | (unixepoch() - a.started_at) as execution_time,
1204 | CASE
1205 | WHEN a.tool_data IS NOT NULL THEN json_extract(a.tool_data, '$.estimated_duration')
1206 | ELSE NULL
1207 | END as estimated_duration
1208 | FROM actions a
1209 | LEFT JOIN workflows w ON a.workflow_id = w.workflow_id
1210 | WHERE a.status IN ('running', 'executing', 'in_progress')
1211 | ORDER BY a.started_at ASC
1212 | """)
1213 |
1214 | columns = [description[0] for description in cursor.description]
1215 | running_actions = [
1216 | dict(zip(columns, row, strict=False)) for row in cursor.fetchall()
1217 | ]
1218 |
1219 | # Enhance with real-time metrics
1220 | enhanced_actions = []
1221 | for action in running_actions:
1222 | try:
1223 | tool_data = json.loads(action.get("tool_data", "{}"))
1224 | except Exception:
1225 | tool_data = {}
1226 |
1227 | execution_time = action.get("execution_time", 0)
1228 | estimated_duration = action.get("estimated_duration") or 30
1229 | progress_percentage = (
1230 | min(95, (execution_time / estimated_duration) * 100)
1231 | if estimated_duration > 0
1232 | else 0
1233 | )
1234 |
1235 | enhanced_action = RunningAction(
1236 | action_id=action["action_id"],
1237 | workflow_id=action.get("workflow_id"),
1238 | workflow_title=action.get("workflow_title"),
1239 | tool_name=action["tool_name"],
1240 | status=action["status"],
1241 | started_at=action["started_at"],
1242 | formatted_start_time=datetime.fromtimestamp(
1243 | action["started_at"]
1244 | ).isoformat(),
1245 | execution_time_seconds=execution_time,
1246 | estimated_duration=estimated_duration,
1247 | progress_percentage=progress_percentage,
1248 | status_indicator=StatusIndicator(
1249 | **get_action_status_indicator(action["status"], execution_time)
1250 | ),
1251 | performance_category=categorize_action_performance(
1252 | execution_time, estimated_duration
1253 | ),
1254 | resource_usage=ResourceUsage(
1255 | **get_action_resource_usage(action["action_id"])
1256 | ),
1257 | tool_data=tool_data,
1258 | )
1259 | enhanced_actions.append(enhanced_action)
1260 |
1261 | conn.close()
1262 |
1263 | return RunningActionsResponse(
1264 | running_actions=enhanced_actions,
1265 | total_running=len(enhanced_actions),
1266 | avg_execution_time=sum(a.execution_time_seconds for a in enhanced_actions)
1267 | / len(enhanced_actions)
1268 | if enhanced_actions
1269 | else 0,
1270 | timestamp=datetime.now().isoformat(),
1271 | )
1272 |
1273 | except sqlite3.Error as e:
1274 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
1275 | except Exception as e:
1276 | raise HTTPException(
1277 | status_code=500, detail=f"Internal server error: {str(e)}"
1278 | ) from e
1279 |
1280 | @app.get(
1281 | "/actions/queue",
1282 | response_model=ActionQueueResponse,
1283 | tags=["Action Monitor"],
1284 | summary="Get queued actions waiting for execution",
1285 | description="""
1286 | Monitor the action execution queue to understand:
1287 |
1288 | - **Queue position** for each waiting action
1289 | - **Priority levels** with human-readable labels
1290 | - **Estimated wait times** based on queue position
1291 | - **Queue time** (how long actions have been waiting)
1292 |
1293 | Essential for understanding system load and execution priorities.
1294 | """,
1295 | responses={
1296 | 200: {
1297 | "description": "List of queued actions with wait time estimates",
1298 | "content": {
1299 | "application/json": {
1300 | "example": {
1301 | "queued_actions": [
1302 | {
1303 | "action_id": "act_789",
1304 | "workflow_id": "wf_456",
1305 | "workflow_title": "Batch Processing",
1306 | "tool_name": "convert_document",
1307 | "status": "queued",
1308 | "created_at": 1703980700.0,
1309 | "formatted_queue_time": "2024-01-01T00:00:00",
1310 | "queue_position": 1,
1311 | "queue_time_seconds": 100.0,
1312 | "estimated_wait_time": 0.0,
1313 | "priority": 3,
1314 | "priority_label": "High",
1315 | "tool_data": {"format": "pdf", "pages": 50},
1316 | }
1317 | ],
1318 | "total_queued": 1,
1319 | "avg_queue_time": 100.0,
1320 | "next_action": {
1321 | "action_id": "act_789",
1322 | "tool_name": "convert_document",
1323 | "priority_label": "High",
1324 | },
1325 | "timestamp": "2024-01-01T00:01:40",
1326 | }
1327 | }
1328 | },
1329 | },
1330 | 500: {"description": "Internal server error"},
1331 | },
1332 | )
1333 | async def get_action_queue() -> ActionQueueResponse:
1334 | """Get queued actions waiting for execution"""
1335 | try:
1336 | conn = get_db_connection()
1337 | cursor = conn.cursor()
1338 |
1339 | cursor.execute("""
1340 | SELECT
1341 | a.*,
1342 | w.title as workflow_title,
1343 | (unixepoch() - a.created_at) as queue_time,
1344 | CASE
1345 | WHEN a.tool_data IS NOT NULL THEN json_extract(a.tool_data, '$.priority')
1346 | ELSE 5
1347 | END as priority
1348 | FROM actions a
1349 | LEFT JOIN workflows w ON a.workflow_id = w.workflow_id
1350 | WHERE a.status IN ('queued', 'pending', 'waiting')
1351 | ORDER BY priority ASC, a.created_at ASC
1352 | """)
1353 |
1354 | columns = [description[0] for description in cursor.description]
1355 | queued_actions = [
1356 | dict(zip(columns, row, strict=False)) for row in cursor.fetchall()
1357 | ]
1358 |
1359 | # Enhance queue data
1360 | enhanced_queue = []
1361 | for i, action in enumerate(queued_actions):
1362 | try:
1363 | tool_data = json.loads(action.get("tool_data", "{}"))
1364 | except Exception:
1365 | tool_data = {}
1366 |
1367 | enhanced_action = QueuedAction(
1368 | action_id=action["action_id"],
1369 | workflow_id=action.get("workflow_id"),
1370 | workflow_title=action.get("workflow_title"),
1371 | tool_name=action["tool_name"],
1372 | status=action["status"],
1373 | created_at=action["created_at"],
1374 | formatted_queue_time=datetime.fromtimestamp(
1375 | action["created_at"]
1376 | ).isoformat(),
1377 | queue_position=i + 1,
1378 | queue_time_seconds=action.get("queue_time", 0),
1379 | estimated_wait_time=estimate_wait_time(i, queued_actions),
1380 | priority=action.get("priority", 5),
1381 | priority_label=get_priority_label(action.get("priority", 5)),
1382 | tool_data=tool_data,
1383 | )
1384 | enhanced_queue.append(enhanced_action)
1385 |
1386 | conn.close()
1387 |
1388 | return ActionQueueResponse(
1389 | queued_actions=enhanced_queue,
1390 | total_queued=len(enhanced_queue),
1391 | avg_queue_time=sum(a.queue_time_seconds for a in enhanced_queue)
1392 | / len(enhanced_queue)
1393 | if enhanced_queue
1394 | else 0,
1395 | next_action=enhanced_queue[0] if enhanced_queue else None,
1396 | timestamp=datetime.now().isoformat(),
1397 | )
1398 |
1399 | except sqlite3.Error as e:
1400 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
1401 | except Exception as e:
1402 | raise HTTPException(
1403 | status_code=500, detail=f"Internal server error: {str(e)}"
1404 | ) from e
1405 |
1406 | # ---------- Action History Pydantic Models ----------
1407 |
1408 | class ActionHistoryItem(BaseModel):
1409 | """Model for a single action in history"""
1410 |
1411 | action_id: str = Field(..., description="Unique action identifier")
1412 | workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
1413 | workflow_title: Optional[str] = Field(None, description="Associated workflow title")
1414 | tool_name: str = Field(..., description="Name of the tool executed")
1415 | action_type: Optional[str] = Field(None, description="Type of action")
1416 | status: str = Field(..., description="Action completion status")
1417 | started_at: float = Field(..., description="Unix timestamp when action started")
1418 | completed_at: Optional[float] = Field(
1419 | None, description="Unix timestamp when action completed"
1420 | )
1421 | execution_duration_seconds: float = Field(
1422 | ..., description="Total execution time in seconds"
1423 | )
1424 | performance_score: float = Field(
1425 | ..., description="Calculated performance score (0-100)"
1426 | )
1427 | efficiency_rating: str = Field(
1428 | ..., description="Efficiency rating based on time and output"
1429 | )
1430 | success_rate_impact: int = Field(..., description="Impact on success rate (1 or 0)")
1431 | formatted_start_time: str = Field(..., description="ISO formatted start time")
1432 | formatted_completion_time: Optional[str] = Field(
1433 | None, description="ISO formatted completion time"
1434 | )
1435 | tool_data: Dict[str, Any] = Field(
1436 | default_factory=dict, description="Tool-specific data"
1437 | )
1438 | result_data: Dict[str, Any] = Field(
1439 | default_factory=dict, description="Action result data"
1440 | )
1441 | result_size: int = Field(0, description="Size of the result data")
1442 |
1443 | class PerformanceSummary(BaseModel):
1444 | """Performance summary statistics"""
1445 |
1446 | avg_score: float = Field(..., description="Average performance score")
1447 | top_performer: Optional[Dict[str, Any]] = Field(
1448 | None, description="Best performing tool"
1449 | )
1450 | worst_performer: Optional[Dict[str, Any]] = Field(
1451 | None, description="Worst performing tool"
1452 | )
1453 | efficiency_distribution: Dict[str, int] = Field(
1454 | ..., description="Distribution of efficiency ratings"
1455 | )
1456 |
1457 | class ActionHistoryResponse(BaseModel):
1458 | """Response model for action history"""
1459 |
1460 | action_history: List[ActionHistoryItem] = Field(
1461 | ..., description="List of completed actions"
1462 | )
1463 | total_actions: int = Field(
1464 | ..., description="Total number of actions in the time period"
1465 | )
1466 | success_rate: float = Field(..., description="Overall success rate percentage")
1467 | avg_execution_time: float = Field(..., description="Average execution time in seconds")
1468 | performance_summary: PerformanceSummary = Field(
1469 | ..., description="Performance summary statistics"
1470 | )
1471 | timestamp: str = Field(..., description="Response timestamp")
1472 |
1473 | # ---------- Action History Endpoint ----------
1474 |
1475 | @app.get(
1476 | "/actions/history",
1477 | response_model=ActionHistoryResponse,
1478 | tags=["Action Monitor"],
1479 | summary="Get completed actions with performance metrics",
1480 | description="""
1481 | Analyze historical action execution data with comprehensive performance metrics:
1482 |
1483 | - **Execution duration** and performance scoring
1484 | - **Success/failure rates** and efficiency ratings
1485 | - **Tool-specific filtering** and status filtering
1486 | - **Aggregate performance metrics** and trends
1487 |
1488 | Perfect for performance analysis, debugging, and system optimization.
1489 | """,
1490 | responses={
1491 | 200: {
1492 | "description": "Historical actions with performance analysis",
1493 | "content": {
1494 | "application/json": {
1495 | "example": {
1496 | "action_history": [
1497 | {
1498 | "action_id": "act_001",
1499 | "workflow_id": "workflow_123",
1500 | "workflow_title": "Document Analysis",
1501 | "tool_name": "smart_browser",
1502 | "action_type": "tool_execution",
1503 | "status": "completed",
1504 | "started_at": 1703980800.0,
1505 | "completed_at": 1703980815.0,
1506 | "execution_duration_seconds": 15.0,
1507 | "performance_score": 90.0,
1508 | "efficiency_rating": "good",
1509 | "success_rate_impact": 1,
1510 | "formatted_start_time": "2024-01-01T00:00:00",
1511 | "formatted_completion_time": "2024-01-01T00:00:15",
1512 | "tool_data": {"url": "https://example.com"},
1513 | "result_data": {"pages_analyzed": 5},
1514 | "result_size": 2048,
1515 | }
1516 | ],
1517 | "total_actions": 150,
1518 | "success_rate": 95.3,
1519 | "avg_execution_time": 12.5,
1520 | "performance_summary": {
1521 | "avg_score": 87.5,
1522 | "top_performer": {"tool_name": "file_reader", "score": 98.5},
1523 | "worst_performer": {"tool_name": "web_scraper", "score": 65.0},
1524 | "efficiency_distribution": {
1525 | "excellent": 45,
1526 | "good": 80,
1527 | "fair": 20,
1528 | "poor": 5,
1529 | },
1530 | },
1531 | "timestamp": "2024-01-01T12:00:00",
1532 | }
1533 | }
1534 | },
1535 | },
1536 | 500: {"description": "Internal server error"},
1537 | },
1538 | )
1539 | async def get_action_history(
1540 | limit: int = Query(
1541 | 50, description="Maximum number of actions to return", ge=1, le=500, example=100
1542 | ),
1543 | offset: int = Query(
1544 | 0, description="Number of actions to skip for pagination", ge=0, example=0
1545 | ),
1546 | status_filter: Optional[str] = Query(
1547 | None,
1548 | description="Filter by action completion status",
1549 | regex="^(completed|failed|cancelled|timeout)$",
1550 | example="completed",
1551 | ),
1552 | tool_filter: Optional[str] = Query(
1553 | None, description="Filter by specific tool name", example="smart_browser"
1554 | ),
1555 | hours_back: int = Query(
1556 | 24,
1557 | description="Hours back to search for completed actions",
1558 | ge=1,
1559 | le=720, # Max 30 days
1560 | example=24,
1561 | ),
1562 | ) -> ActionHistoryResponse:
1563 | """Get completed actions with performance metrics"""
1564 | try:
1565 | conn = get_db_connection()
1566 | cursor = conn.cursor()
1567 |
1568 | since_timestamp = datetime.now().timestamp() - (hours_back * 3600)
1569 |
1570 | query = """
1571 | SELECT
1572 | a.*,
1573 | w.title as workflow_title,
1574 | (a.completed_at - a.started_at) as execution_duration,
1575 | CASE
1576 | WHEN a.tool_data IS NOT NULL THEN json_extract(a.tool_data, '$.result_size')
1577 | ELSE 0
1578 | END as result_size
1579 | FROM actions a
1580 | LEFT JOIN workflows w ON a.workflow_id = w.workflow_id
1581 | WHERE a.status IN ('completed', 'failed', 'cancelled', 'timeout')
1582 | AND a.completed_at >= ?
1583 | """
1584 | params = [since_timestamp]
1585 |
1586 | if status_filter:
1587 | query += " AND a.status = ?"
1588 | params.append(status_filter)
1589 |
1590 | if tool_filter:
1591 | query += " AND a.tool_name = ?"
1592 | params.append(tool_filter)
1593 |
1594 | query += """
1595 | ORDER BY a.completed_at DESC
1596 | LIMIT ? OFFSET ?
1597 | """
1598 | params.extend([limit, offset])
1599 |
1600 | cursor.execute(query, params)
1601 | columns = [description[0] for description in cursor.description]
1602 | completed_actions = [
1603 | dict(zip(columns, row, strict=False)) for row in cursor.fetchall()
1604 | ]
1605 |
1606 | # Calculate performance metrics
1607 | enhanced_history = []
1608 | for action in completed_actions:
1609 | try:
1610 | tool_data = json.loads(action.get("tool_data", "{}"))
1611 | result_data = json.loads(action.get("result", "{}"))
1612 | except Exception:
1613 | tool_data = {}
1614 | result_data = {}
1615 |
1616 | execution_duration = action.get("execution_duration", 0)
1617 |
1618 | # Create performance-enhanced action item
1619 | action_data = {
1620 | "action_id": action["action_id"],
1621 | "workflow_id": action.get("workflow_id"),
1622 | "workflow_title": action.get("workflow_title"),
1623 | "tool_name": action["tool_name"],
1624 | "action_type": action.get("action_type"),
1625 | "status": action["status"],
1626 | "started_at": action["started_at"],
1627 | "completed_at": action.get("completed_at"),
1628 | "execution_duration_seconds": execution_duration,
1629 | "performance_score": calculate_action_performance_score(action),
1630 | "efficiency_rating": calculate_efficiency_rating(
1631 | execution_duration, action.get("result_size", 0)
1632 | ),
1633 | "success_rate_impact": 1 if action["status"] == "completed" else 0,
1634 | "formatted_start_time": datetime.fromtimestamp(
1635 | action["started_at"]
1636 | ).isoformat(),
1637 | "formatted_completion_time": datetime.fromtimestamp(
1638 | action["completed_at"]
1639 | ).isoformat()
1640 | if action.get("completed_at")
1641 | else None,
1642 | "tool_data": tool_data,
1643 | "result_data": result_data,
1644 | "result_size": action.get("result_size", 0),
1645 | }
1646 |
1647 | enhanced_history.append(ActionHistoryItem(**action_data))
1648 |
1649 | # Calculate aggregate metrics
1650 | total_actions = len(enhanced_history)
1651 | successful_actions = len([a for a in enhanced_history if a.status == "completed"])
1652 | avg_duration = (
1653 | sum(a.execution_duration_seconds for a in enhanced_history) / total_actions
1654 | if total_actions > 0
1655 | else 0
1656 | )
1657 |
1658 | # Create performance summary
1659 | action_dicts = [a.dict() for a in enhanced_history]
1660 | performance_summary = PerformanceSummary(
1661 | **calculate_performance_summary(action_dicts)
1662 | )
1663 |
1664 | conn.close()
1665 |
1666 | return ActionHistoryResponse(
1667 | action_history=enhanced_history,
1668 | total_actions=total_actions,
1669 | success_rate=(successful_actions / total_actions * 100)
1670 | if total_actions > 0
1671 | else 0,
1672 | avg_execution_time=avg_duration,
1673 | performance_summary=performance_summary,
1674 | timestamp=datetime.now().isoformat(),
1675 | )
1676 |
1677 | except sqlite3.Error as e:
1678 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
1679 | except Exception as e:
1680 | raise HTTPException(
1681 | status_code=500, detail=f"Internal server error: {str(e)}"
1682 | ) from e
1683 |
1684 | # ---------- Action Metrics Pydantic Models ----------
1685 |
1686 | class OverallMetrics(BaseModel):
1687 | """Overall action execution metrics"""
1688 |
1689 | total_actions: int = Field(..., description="Total number of actions executed")
1690 | successful_actions: int = Field(
1691 | ..., description="Number of successfully completed actions"
1692 | )
1693 | failed_actions: int = Field(..., description="Number of failed actions")
1694 | avg_duration: Optional[float] = Field(
1695 | None, description="Average execution duration in seconds"
1696 | )
1697 | success_rate_percentage: float = Field(
1698 | ..., description="Overall success rate as percentage"
1699 | )
1700 | failure_rate_percentage: float = Field(
1701 | ..., description="Overall failure rate as percentage"
1702 | )
1703 | avg_duration_seconds: float = Field(..., description="Average duration in seconds")
1704 |
1705 | class ToolUsageStat(BaseModel):
1706 | """Statistics for a single tool"""
1707 |
1708 | tool_name: str = Field(..., description="Name of the tool")
1709 | usage_count: int = Field(..., description="Number of times the tool was used")
1710 | success_count: int = Field(..., description="Number of successful executions")
1711 | avg_duration: Optional[float] = Field(
1712 | None, description="Average execution time in seconds"
1713 | )
1714 |
1715 | class HourlyMetric(BaseModel):
1716 | """Hourly performance metrics"""
1717 |
1718 | hour: str = Field(..., description="Hour of the day (0-23)")
1719 | action_count: int = Field(..., description="Number of actions in this hour")
1720 | avg_duration: Optional[float] = Field(
1721 | None, description="Average duration for this hour"
1722 | )
1723 | success_count: int = Field(..., description="Number of successful actions")
1724 |
1725 | class PerformanceInsight(BaseModel):
1726 | """Performance insight or recommendation"""
1727 |
1728 | type: str = Field(..., description="Type of insight (warning, info, etc.)")
1729 | title: str = Field(..., description="Title of the insight")
1730 | message: str = Field(..., description="Detailed message")
1731 | severity: str = Field(..., description="Severity level (high, medium, low)")
1732 |
1733 | class ActionMetricsResponse(BaseModel):
1734 | """Response model for action metrics"""
1735 |
1736 | overall_metrics: OverallMetrics = Field(..., description="Overall execution metrics")
1737 | tool_usage_stats: List[ToolUsageStat] = Field(
1738 | ..., description="Per-tool usage statistics"
1739 | )
1740 | hourly_performance: List[HourlyMetric] = Field(
1741 | ..., description="Hourly performance breakdown"
1742 | )
1743 | performance_insights: List[PerformanceInsight] = Field(
1744 | ..., description="Actionable insights and recommendations"
1745 | )
1746 | timestamp: str = Field(..., description="Response timestamp")
1747 |
1748 | # ---------- Action Metrics Endpoint ----------
1749 |
1750 | @app.get(
1751 | "/actions/metrics",
1752 | response_model=ActionMetricsResponse,
1753 | tags=["Action Monitor"],
1754 | summary="Get comprehensive action execution metrics",
1755 | description="""
1756 | Retrieve system-wide action execution analytics including:
1757 |
1758 | - **Overall success/failure rates** for the past 24 hours
1759 | - **Tool usage statistics** with performance breakdowns
1760 | - **Hourly performance distribution** showing usage patterns
1761 | - **Performance insights** with actionable recommendations
1762 |
1763 | This endpoint provides executive-level insights into system performance and health.
1764 | """,
1765 | responses={
1766 | 200: {
1767 | "description": "Comprehensive action execution metrics and analytics",
1768 | "content": {
1769 | "application/json": {
1770 | "example": {
1771 | "overall_metrics": {
1772 | "total_actions": 1523,
1773 | "successful_actions": 1450,
1774 | "failed_actions": 73,
1775 | "avg_duration": 8.5,
1776 | "success_rate_percentage": 95.2,
1777 | "failure_rate_percentage": 4.8,
1778 | "avg_duration_seconds": 8.5,
1779 | },
1780 | "tool_usage_stats": [
1781 | {
1782 | "tool_name": "smart_browser",
1783 | "usage_count": 342,
1784 | "success_count": 325,
1785 | "avg_duration": 15.3,
1786 | },
1787 | {
1788 | "tool_name": "file_reader",
1789 | "usage_count": 289,
1790 | "success_count": 287,
1791 | "avg_duration": 2.1,
1792 | },
1793 | ],
1794 | "hourly_performance": [
1795 | {
1796 | "hour": "09",
1797 | "action_count": 125,
1798 | "avg_duration": 7.8,
1799 | "success_count": 120,
1800 | },
1801 | {
1802 | "hour": "10",
1803 | "action_count": 143,
1804 | "avg_duration": 8.2,
1805 | "success_count": 138,
1806 | },
1807 | ],
1808 | "performance_insights": [
1809 | {
1810 | "type": "warning",
1811 | "title": "Low Success Rate",
1812 | "message": "Current success rate is 75.5%. Consider investigating failing tools.",
1813 | "severity": "high",
1814 | },
1815 | {
1816 | "type": "info",
1817 | "title": "Peak Usage",
1818 | "message": "Peak usage occurs at 14:00 with 189 actions.",
1819 | "severity": "low",
1820 | },
1821 | ],
1822 | "timestamp": "2024-01-01T12:00:00",
1823 | }
1824 | }
1825 | },
1826 | },
1827 | 500: {"description": "Internal server error"},
1828 | },
1829 | )
1830 | async def get_action_metrics() -> ActionMetricsResponse:
1831 | """Get comprehensive action execution metrics and analytics"""
1832 | try:
1833 | conn = get_db_connection()
1834 | cursor = conn.cursor()
1835 |
1836 | # Get metrics for last 24 hours
1837 | since_timestamp = datetime.now().timestamp() - (24 * 3600)
1838 |
1839 | # Overall statistics
1840 | cursor.execute(
1841 | """
1842 | SELECT
1843 | COUNT(*) as total_actions,
1844 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful_actions,
1845 | SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_actions,
1846 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration
1847 | FROM actions
1848 | WHERE created_at >= ?
1849 | """,
1850 | (since_timestamp,),
1851 | )
1852 |
1853 | overall_result = cursor.fetchone()
1854 | overall_dict = dict(
1855 | zip([d[0] for d in cursor.description], overall_result, strict=False)
1856 | )
1857 |
1858 | # Create overall metrics
1859 | success_rate = (
1860 | (overall_dict["successful_actions"] / overall_dict["total_actions"] * 100)
1861 | if overall_dict["total_actions"] > 0
1862 | else 0
1863 | )
1864 |
1865 | overall_metrics = OverallMetrics(
1866 | total_actions=overall_dict["total_actions"] or 0,
1867 | successful_actions=overall_dict["successful_actions"] or 0,
1868 | failed_actions=overall_dict["failed_actions"] or 0,
1869 | avg_duration=overall_dict["avg_duration"],
1870 | success_rate_percentage=success_rate,
1871 | failure_rate_percentage=100 - success_rate,
1872 | avg_duration_seconds=overall_dict["avg_duration"] or 0,
1873 | )
1874 |
1875 | # Tool usage statistics
1876 | cursor.execute(
1877 | """
1878 | SELECT
1879 | tool_name,
1880 | COUNT(*) as usage_count,
1881 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as success_count,
1882 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration
1883 | FROM actions
1884 | WHERE created_at >= ?
1885 | GROUP BY tool_name
1886 | ORDER BY usage_count DESC
1887 | """,
1888 | (since_timestamp,),
1889 | )
1890 |
1891 | tool_stats = [
1892 | ToolUsageStat(
1893 | tool_name=row[0],
1894 | usage_count=row[1],
1895 | success_count=row[2],
1896 | avg_duration=row[3],
1897 | )
1898 | for row in cursor.fetchall()
1899 | ]
1900 |
1901 | # Performance distribution over time (hourly)
1902 | cursor.execute(
1903 | """
1904 | SELECT
1905 | strftime('%H', datetime(started_at, 'unixepoch')) as hour,
1906 | COUNT(*) as action_count,
1907 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration,
1908 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as success_count
1909 | FROM actions
1910 | WHERE started_at >= ?
1911 | GROUP BY hour
1912 | ORDER BY hour
1913 | """,
1914 | (since_timestamp,),
1915 | )
1916 |
1917 | hourly_metrics = [
1918 | HourlyMetric(
1919 | hour=row[0], action_count=row[1], avg_duration=row[2], success_count=row[3]
1920 | )
1921 | for row in cursor.fetchall()
1922 | ]
1923 |
1924 | conn.close()
1925 |
1926 | # Generate performance insights
1927 | tool_stats_dicts = [t.dict() for t in tool_stats]
1928 | hourly_metrics_dicts = [h.dict() for h in hourly_metrics]
1929 | insights_data = generate_performance_insights(
1930 | overall_dict, tool_stats_dicts, hourly_metrics_dicts
1931 | )
1932 |
1933 | performance_insights = [PerformanceInsight(**insight) for insight in insights_data]
1934 |
1935 | return ActionMetricsResponse(
1936 | overall_metrics=overall_metrics,
1937 | tool_usage_stats=tool_stats,
1938 | hourly_performance=hourly_metrics,
1939 | performance_insights=performance_insights,
1940 | timestamp=datetime.now().isoformat(),
1941 | )
1942 |
1943 | except sqlite3.Error as e:
1944 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
1945 | except Exception as e:
1946 | raise HTTPException(
1947 | status_code=500, detail=f"Internal server error: {str(e)}"
1948 | ) from e
1949 |
1950 | # ---------- Artifacts Helper Functions ----------
1951 |
1952 | def format_file_size(size_bytes: int) -> str:
1953 | """Format file size in human readable format"""
1954 | if size_bytes == 0:
1955 | return "0 B"
1956 |
1957 | size_names = ["B", "KB", "MB", "GB", "TB"]
1958 | i = int(math.floor(math.log(size_bytes, 1024)))
1959 | p = math.pow(1024, i)
1960 | s = round(size_bytes / p, 2)
1961 | return f"{s} {size_names[i]}"
1962 |
1963 | # ---------- Artifacts Pydantic Models ----------
1964 |
1965 | class Artifact(BaseModel):
1966 | """Model for a single artifact"""
1967 |
1968 | artifact_id: str = Field(..., description="Unique artifact identifier")
1969 | name: str = Field(..., description="Name of the artifact")
1970 | artifact_type: str = Field(
1971 | ..., description="Type of artifact (document, image, code, etc.)"
1972 | )
1973 | description: Optional[str] = Field(None, description="Description of the artifact")
1974 | file_path: Optional[str] = Field(None, description="File system path to the artifact")
1975 | workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
1976 | workflow_title: Optional[str] = Field(None, description="Title of associated workflow")
1977 | created_at: float = Field(..., description="Creation timestamp")
1978 | updated_at: float = Field(..., description="Last update timestamp")
1979 | file_size: int = Field(..., description="File size in bytes")
1980 | file_size_human: str = Field(..., description="Human-readable file size")
1981 | importance: Optional[float] = Field(None, description="Importance score (1-10)")
1982 | access_count: int = Field(0, description="Number of times accessed")
1983 | tags: List[str] = Field(default_factory=list, description="Associated tags")
1984 | metadata: Dict[str, Any] = Field(
1985 | default_factory=dict, description="Additional metadata"
1986 | )
1987 | relationship_count: int = Field(0, description="Number of related artifacts")
1988 | version_count: int = Field(0, description="Number of versions")
1989 | formatted_created_at: str = Field(..., description="ISO formatted creation date")
1990 | formatted_updated_at: str = Field(..., description="ISO formatted update date")
1991 | age_days: float = Field(..., description="Age of artifact in days")
1992 |
1993 | class ArtifactsFilter(BaseModel):
1994 | """Filter parameters used in the request"""
1995 |
1996 | artifact_type: Optional[str] = Field(None, description="Type filter applied")
1997 | workflow_id: Optional[str] = Field(None, description="Workflow filter applied")
1998 | tags: Optional[str] = Field(None, description="Tags filter applied")
1999 | search: Optional[str] = Field(None, description="Search query applied")
2000 | sort_by: str = Field(..., description="Sort field used")
2001 | sort_order: str = Field(..., description="Sort order used")
2002 |
2003 | class ArtifactsResponse(BaseModel):
2004 | """Response model for artifacts listing"""
2005 |
2006 | artifacts: List[Artifact] = Field(..., description="List of artifacts")
2007 | total: int = Field(..., description="Total number of artifacts matching query")
2008 | has_more: bool = Field(..., description="Whether there are more artifacts available")
2009 | filters: ArtifactsFilter = Field(..., description="Filters that were applied")
2010 |
2011 | # ---------- Artifacts Listing Endpoint ----------
2012 |
2013 | @app.get(
2014 | "/artifacts",
2015 | response_model=ArtifactsResponse,
2016 | tags=["Artifacts"],
2017 | summary="List artifacts with filtering and search",
2018 | description="""
2019 | Explore system artifacts with comprehensive filtering and search capabilities:
2020 |
2021 | - **Type-based filtering** for specific artifact categories
2022 | - **Workflow association** to see artifacts by workflow
2023 | - **Tag-based search** for categorized artifacts
2024 | - **Full-text search** across names and descriptions
2025 | - **Sorting options** with configurable order
2026 |
2027 | Includes relationship counts, version information, and human-readable metadata.
2028 | """,
2029 | responses={
2030 | 200: {
2031 | "description": "List of artifacts with metadata and relationships",
2032 | "content": {
2033 | "application/json": {
2034 | "example": {
2035 | "artifacts": [
2036 | {
2037 | "artifact_id": "artifact_123",
2038 | "name": "Analysis Report",
2039 | "artifact_type": "document",
2040 | "description": "Comprehensive analysis of user behavior",
2041 | "file_path": "/storage/artifacts/report_123.pdf",
2042 | "workflow_id": "workflow_456",
2043 | "workflow_title": "User Analysis Workflow",
2044 | "created_at": 1703980800.0,
2045 | "updated_at": 1704067200.0,
2046 | "file_size": 2048576,
2047 | "file_size_human": "2.0 MB",
2048 | "importance": 8.5,
2049 | "access_count": 15,
2050 | "tags": ["analysis", "report", "important"],
2051 | "metadata": {"pages": 42, "format": "PDF"},
2052 | "relationship_count": 3,
2053 | "version_count": 2,
2054 | "formatted_created_at": "2024-01-01T00:00:00",
2055 | "formatted_updated_at": "2024-01-01T12:00:00",
2056 | "age_days": 1.5,
2057 | }
2058 | ],
2059 | "total": 1,
2060 | "has_more": False,
2061 | "filters": {
2062 | "artifact_type": "document",
2063 | "workflow_id": None,
2064 | "tags": None,
2065 | "search": None,
2066 | "sort_by": "created_at",
2067 | "sort_order": "desc",
2068 | },
2069 | }
2070 | }
2071 | },
2072 | },
2073 | 500: {"description": "Internal server error"},
2074 | },
2075 | )
2076 | async def get_artifacts(
2077 | artifact_type: Optional[str] = Query(
2078 | None, description="Filter by specific artifact type", example="document"
2079 | ),
2080 | workflow_id: Optional[str] = Query(
2081 | None, description="Filter by workflow ID", example="workflow_abc123"
2082 | ),
2083 | tags: Optional[str] = Query(
2084 | None, description="Search within artifact tags", example="important"
2085 | ),
2086 | search: Optional[str] = Query(
2087 | None,
2088 | description="Full-text search in names and descriptions",
2089 | example="analysis report",
2090 | ),
2091 | sort_by: str = Query(
2092 | "created_at",
2093 | description="Field to sort results by",
2094 | regex="^(created_at|updated_at|name|importance|access_count)$",
2095 | ),
2096 | sort_order: str = Query(
2097 | "desc", description="Sort order direction", regex="^(asc|desc)$"
2098 | ),
2099 | limit: int = Query(
2100 | 50, description="Maximum number of artifacts to return", ge=1, le=200
2101 | ),
2102 | offset: int = Query(0, description="Number of artifacts to skip for pagination", ge=0),
2103 | ) -> ArtifactsResponse:
2104 | """List artifacts with filtering and search"""
2105 | try:
2106 | conn = get_db_connection()
2107 | cursor = conn.cursor()
2108 |
2109 | # Base query
2110 | query = """
2111 | SELECT
2112 | a.*,
2113 | w.title as workflow_title,
2114 | COUNT(DISTINCT ar.target_artifact_id) as relationship_count,
2115 | COUNT(DISTINCT versions.artifact_id) as version_count
2116 | FROM artifacts a
2117 | LEFT JOIN workflows w ON a.workflow_id = w.workflow_id
2118 | LEFT JOIN artifact_relationships ar ON a.artifact_id = ar.source_artifact_id
2119 | LEFT JOIN artifacts versions ON a.artifact_id = versions.parent_artifact_id
2120 | WHERE 1=1
2121 | """
2122 | params = []
2123 |
2124 | if artifact_type:
2125 | query += " AND a.artifact_type = ?"
2126 | params.append(artifact_type)
2127 |
2128 | if workflow_id:
2129 | query += " AND a.workflow_id = ?"
2130 | params.append(workflow_id)
2131 |
2132 | if tags:
2133 | query += " AND a.tags LIKE ?"
2134 | params.append(f"%{tags}%")
2135 |
2136 | if search:
2137 | query += " AND (a.name LIKE ? OR a.description LIKE ?)"
2138 | params.extend([f"%{search}%", f"%{search}%"])
2139 |
2140 | query += f"""
2141 | GROUP BY a.artifact_id
2142 | ORDER BY a.{sort_by} {"DESC" if sort_order == "desc" else "ASC"}
2143 | LIMIT ? OFFSET ?
2144 | """
2145 | params.extend([limit, offset])
2146 |
2147 | cursor.execute(query, params)
2148 | columns = [description[0] for description in cursor.description]
2149 | artifacts_data = [
2150 | dict(zip(columns, row, strict=False)) for row in cursor.fetchall()
2151 | ]
2152 |
2153 | # Enhance artifacts with metadata
2154 | artifacts = []
2155 | for artifact_data in artifacts_data:
2156 | # Parse tags and metadata
2157 | try:
2158 | tags_list = (
2159 | json.loads(artifact_data.get("tags", "[]"))
2160 | if artifact_data.get("tags")
2161 | else []
2162 | )
2163 | metadata_dict = (
2164 | json.loads(artifact_data.get("metadata", "{}"))
2165 | if artifact_data.get("metadata")
2166 | else {}
2167 | )
2168 | except Exception:
2169 | tags_list = []
2170 | metadata_dict = {}
2171 |
2172 | artifact = Artifact(
2173 | artifact_id=artifact_data["artifact_id"],
2174 | name=artifact_data["name"],
2175 | artifact_type=artifact_data["artifact_type"],
2176 | description=artifact_data.get("description"),
2177 | file_path=artifact_data.get("file_path"),
2178 | workflow_id=artifact_data.get("workflow_id"),
2179 | workflow_title=artifact_data.get("workflow_title"),
2180 | created_at=artifact_data["created_at"],
2181 | updated_at=artifact_data["updated_at"],
2182 | file_size=artifact_data.get("file_size", 0),
2183 | file_size_human=format_file_size(artifact_data.get("file_size", 0)),
2184 | importance=artifact_data.get("importance"),
2185 | access_count=artifact_data.get("access_count", 0),
2186 | tags=tags_list,
2187 | metadata=metadata_dict,
2188 | relationship_count=artifact_data.get("relationship_count", 0),
2189 | version_count=artifact_data.get("version_count", 0),
2190 | formatted_created_at=datetime.fromtimestamp(
2191 | artifact_data["created_at"]
2192 | ).isoformat(),
2193 | formatted_updated_at=datetime.fromtimestamp(
2194 | artifact_data["updated_at"]
2195 | ).isoformat(),
2196 | age_days=(datetime.now().timestamp() - artifact_data["created_at"]) / 86400,
2197 | )
2198 | artifacts.append(artifact)
2199 |
2200 | conn.close()
2201 |
2202 | return ArtifactsResponse(
2203 | artifacts=artifacts,
2204 | total=len(artifacts),
2205 | has_more=len(artifacts) == limit,
2206 | filters=ArtifactsFilter(
2207 | artifact_type=artifact_type,
2208 | workflow_id=workflow_id,
2209 | tags=tags,
2210 | search=search,
2211 | sort_by=sort_by,
2212 | sort_order=sort_order,
2213 | ),
2214 | )
2215 |
2216 | except sqlite3.Error as e:
2217 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
2218 | except Exception as e:
2219 | raise HTTPException(
2220 | status_code=500, detail=f"Internal server error: {str(e)}"
2221 | ) from e
2222 |
2223 | # ---------- Artifacts Statistics Models ----------
2224 |
2225 | class ArtifactTypeStats(BaseModel):
2226 | """Statistics for a specific artifact type"""
2227 |
2228 | artifact_type: str = Field(..., description="Type of artifact")
2229 | count: int = Field(..., description="Number of artifacts of this type")
2230 | avg_importance: Optional[float] = Field(None, description="Average importance score")
2231 | total_size: int = Field(..., description="Total size of all artifacts of this type")
2232 | max_access_count: int = Field(..., description="Maximum access count for this type")
2233 |
2234 | class ArtifactOverallStats(BaseModel):
2235 | """Overall artifact statistics"""
2236 |
2237 | total_artifacts: int = Field(..., description="Total number of artifacts")
2238 | unique_types: int = Field(..., description="Number of unique artifact types")
2239 | unique_workflows: int = Field(..., description="Number of unique workflows")
2240 | total_size: int = Field(..., description="Total size of all artifacts in bytes")
2241 | total_size_human: str = Field(..., description="Human-readable total size")
2242 | avg_size: float = Field(..., description="Average artifact size in bytes")
2243 | latest_created: Optional[float] = Field(
2244 | None, description="Timestamp of most recent artifact"
2245 | )
2246 | earliest_created: Optional[float] = Field(
2247 | None, description="Timestamp of oldest artifact"
2248 | )
2249 |
2250 | class ArtifactStatsResponse(BaseModel):
2251 | """Response model for artifact statistics"""
2252 |
2253 | overall: ArtifactOverallStats = Field(..., description="Overall statistics")
2254 | by_type: List[ArtifactTypeStats] = Field(
2255 | ..., description="Statistics broken down by type"
2256 | )
2257 |
2258 | # ---------- Artifacts Statistics Endpoint ----------
2259 |
2260 | @app.get(
2261 | "/artifacts/stats",
2262 | response_model=ArtifactStatsResponse,
2263 | tags=["Artifacts"],
2264 | summary="Get artifact statistics and analytics",
2265 | description="""
2266 | Retrieve comprehensive statistics about system artifacts including:
2267 |
2268 | - **Overall counts** and storage usage
2269 | - **Type-based breakdown** with metrics per artifact type
2270 | - **Importance scoring** averages and distributions
2271 | - **Access patterns** and usage statistics
2272 |
2273 | Perfect for understanding artifact distribution and usage patterns across the system.
2274 | """,
2275 | responses={
2276 | 200: {
2277 | "description": "Comprehensive artifact statistics and analytics",
2278 | "content": {
2279 | "application/json": {
2280 | "example": {
2281 | "overall": {
2282 | "total_artifacts": 150,
2283 | "unique_types": 5,
2284 | "unique_workflows": 25,
2285 | "total_size": 1073741824,
2286 | "total_size_human": "1.0 GB",
2287 | "avg_size": 7158279,
2288 | "latest_created": 1704067200.0,
2289 | "earliest_created": 1703980800.0,
2290 | },
2291 | "by_type": [
2292 | {
2293 | "artifact_type": "document",
2294 | "count": 75,
2295 | "avg_importance": 7.5,
2296 | "total_size": 536870912,
2297 | "max_access_count": 50,
2298 | },
2299 | {
2300 | "artifact_type": "image",
2301 | "count": 50,
2302 | "avg_importance": 6.0,
2303 | "total_size": 268435456,
2304 | "max_access_count": 30,
2305 | },
2306 | ],
2307 | }
2308 | }
2309 | },
2310 | },
2311 | 500: {"description": "Internal server error"},
2312 | },
2313 | )
2314 | async def get_artifact_stats() -> ArtifactStatsResponse:
2315 | """Get artifact statistics and analytics"""
2316 | try:
2317 | conn = get_db_connection()
2318 | cursor = conn.cursor()
2319 |
2320 | # Overall statistics
2321 | cursor.execute("""
2322 | SELECT
2323 | COUNT(*) as total_artifacts,
2324 | COUNT(DISTINCT artifact_type) as unique_types,
2325 | COUNT(DISTINCT workflow_id) as unique_workflows,
2326 | SUM(file_size) as total_size,
2327 | AVG(file_size) as avg_size,
2328 | MAX(created_at) as latest_created,
2329 | MIN(created_at) as earliest_created
2330 | FROM artifacts
2331 | """)
2332 |
2333 | result = cursor.fetchone()
2334 | overall_dict = (
2335 | dict(zip([d[0] for d in cursor.description], result, strict=False))
2336 | if result
2337 | else {}
2338 | )
2339 |
2340 | overall = ArtifactOverallStats(
2341 | total_artifacts=overall_dict.get("total_artifacts", 0),
2342 | unique_types=overall_dict.get("unique_types", 0),
2343 | unique_workflows=overall_dict.get("unique_workflows", 0),
2344 | total_size=overall_dict.get("total_size", 0),
2345 | total_size_human=format_file_size(overall_dict.get("total_size", 0)),
2346 | avg_size=overall_dict.get("avg_size", 0),
2347 | latest_created=overall_dict.get("latest_created"),
2348 | earliest_created=overall_dict.get("earliest_created"),
2349 | )
2350 |
2351 | # Type-based statistics
2352 | cursor.execute("""
2353 | SELECT
2354 | artifact_type,
2355 | COUNT(*) as count,
2356 | AVG(importance) as avg_importance,
2357 | SUM(file_size) as total_size,
2358 | MAX(access_count) as max_access_count
2359 | FROM artifacts
2360 | GROUP BY artifact_type
2361 | ORDER BY count DESC
2362 | """)
2363 |
2364 | by_type = [
2365 | ArtifactTypeStats(
2366 | artifact_type=row[0],
2367 | count=row[1],
2368 | avg_importance=row[2],
2369 | total_size=row[3] or 0,
2370 | max_access_count=row[4] or 0,
2371 | )
2372 | for row in cursor.fetchall()
2373 | ]
2374 |
2375 | conn.close()
2376 |
2377 | return ArtifactStatsResponse(overall=overall, by_type=by_type)
2378 |
2379 | except sqlite3.Error as e:
2380 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
2381 | except Exception as e:
2382 | raise HTTPException(
2383 | status_code=500, detail=f"Internal server error: {str(e)}"
2384 | ) from e
2385 |
2386 | # ---------- Memory Quality Pydantic Models ----------
2387 |
2388 | class MemoryDetail(BaseModel):
2389 | """Detailed information about a memory"""
2390 | memory_id: str = Field(..., description="Unique memory identifier")
2391 | workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
2392 | memory_type: str = Field(..., description="Type of memory")
2393 | importance: float = Field(..., description="Importance score")
2394 | created_at: float = Field(..., description="Creation timestamp")
2395 |
2396 | class DuplicateGroup(BaseModel):
2397 | """Group of duplicate memories"""
2398 | cluster_id: str = Field(..., description="Unique identifier for this duplicate cluster")
2399 | content_preview: str = Field(..., description="Preview of the duplicated content")
2400 | duplicate_count: int = Field(..., description="Number of duplicates in this group")
2401 | memory_ids: List[str] = Field(..., description="List of all memory IDs in this group")
2402 | primary_memory_id: str = Field(..., description="Suggested primary memory to keep")
2403 | memory_details: List[MemoryDetail] = Field(..., description="Detailed info for each memory")
2404 | first_created: float = Field(..., description="Timestamp of earliest duplicate")
2405 | last_created: float = Field(..., description="Timestamp of latest duplicate")
2406 | avg_importance: float = Field(..., description="Average importance across duplicates")
2407 | recommendation: str = Field(..., description="Recommended action (merge/review)")
2408 |
2409 | class DuplicatesResponse(BaseModel):
2410 | """Response model for duplicate analysis"""
2411 | success: bool = Field(..., description="Whether analysis completed successfully")
2412 | clusters: List[DuplicateGroup] = Field(..., description="List of duplicate groups")
2413 | duplicate_groups: List[DuplicateGroup] = Field(..., description="Alias for clusters (backward compatibility)")
2414 | total_groups: int = Field(..., description="Total number of duplicate groups found")
2415 | total_duplicates: int = Field(..., description="Total number of duplicate memories")
2416 |
2417 | class OrphanedMemory(BaseModel):
2418 | """Model for an orphaned memory"""
2419 | memory_id: str = Field(..., description="Unique memory identifier")
2420 | content: str = Field(..., description="Memory content")
2421 | memory_type: str = Field(..., description="Type of memory")
2422 | importance: float = Field(..., description="Importance score")
2423 | created_at: float = Field(..., description="Creation timestamp")
2424 |
2425 | class OrphanedMemoriesResponse(BaseModel):
2426 | """Response model for orphaned memories"""
2427 | success: bool = Field(..., description="Whether query completed successfully")
2428 | orphaned_memories: List[OrphanedMemory] = Field(..., description="List of orphaned memories")
2429 | total_orphaned: int = Field(..., description="Total count of orphaned memories")
2430 | recommendation: str = Field(..., description="Recommended action for orphaned memories")
2431 |
2432 | class BulkOperationRequest(BaseModel):
2433 | """Request model for bulk operations"""
2434 | operation_type: str = Field(
2435 | ...,
2436 | description="Type of bulk operation to perform",
2437 | regex="^(delete|archive|merge)$"
2438 | )
2439 | memory_ids: List[str] = Field(
2440 | ...,
2441 | description="List of memory IDs to operate on",
2442 | min_items=1
2443 | )
2444 | target_memory_id: Optional[str] = Field(
2445 | None,
2446 | description="Target memory ID for merge operations"
2447 | )
2448 |
2449 | class BulkOperationResponse(BaseModel):
2450 | """Response model for bulk operations"""
2451 | success: bool = Field(..., description="Whether operation completed successfully")
2452 | operation_type: str = Field(..., description="Type of operation performed")
2453 | memory_ids: List[str] = Field(..., description="Memory IDs that were operated on")
2454 | success_count: int = Field(..., description="Number of successful operations")
2455 | error_count: int = Field(..., description="Number of failed operations")
2456 | message: str = Field(..., description="Summary message of the operation")
2457 | errors: List[str] = Field(default_factory=list, description="List of error messages")
2458 | merged_into: Optional[str] = Field(None, description="Target memory ID for merge operations")
2459 |
2460 | class PreviewMemory(BaseModel):
2461 | """Memory preview for bulk operations"""
2462 | memory_id: str = Field(..., description="Memory ID")
2463 | content: str = Field(..., description="Memory content")
2464 | memory_type: str = Field(..., description="Type of memory")
2465 | importance: float = Field(..., description="Importance score")
2466 | workflow_id: Optional[str] = Field(None, description="Associated workflow")
2467 |
2468 | class BulkOperationPreview(BaseModel):
2469 | """Preview of bulk operation effects"""
2470 | operation_type: str = Field(..., description="Type of operation to be performed")
2471 | total_affected: int = Field(..., description="Total memories that will be affected")
2472 | preview_description: str = Field(..., description="Description of what will happen")
2473 | affected_memories: List[PreviewMemory] = Field(..., description="Details of affected memories")
2474 | merge_target: Optional[PreviewMemory] = Field(None, description="Target memory for merge")
2475 | will_be_deleted: Optional[List[PreviewMemory]] = Field(None, description="Memories to be deleted in merge")
2476 |
2477 | class BulkPreviewResponse(BaseModel):
2478 | """Response model for bulk operation preview"""
2479 | success: bool = Field(..., description="Whether preview generated successfully")
2480 | operation: BulkOperationPreview = Field(..., description="Preview of the operation")
2481 |
2482 | # ---------- Memory Quality Endpoints ----------
2483 |
2484 | @app.get(
2485 | "/memory-quality/duplicates",
2486 | response_model=DuplicatesResponse,
2487 | tags=["Memory Quality"],
2488 | summary="Get detailed duplicate memory analysis",
2489 | description="""
2490 | Retrieve comprehensive information about duplicate memories:
2491 |
2492 | - **Duplicate clusters** with identical content
2493 | - **Memory details** for each duplicate group
2494 | - **Merge recommendations** based on duplicate count
2495 | - **Temporal analysis** of when duplicates were created
2496 |
2497 | Essential for understanding and resolving memory duplication issues.
2498 | """,
2499 | responses={
2500 | 200: {
2501 | "description": "Duplicate analysis successfully retrieved",
2502 | "content": {
2503 | "application/json": {
2504 | "example": {
2505 | "success": True,
2506 | "clusters": [
2507 | {
2508 | "cluster_id": "dup_cluster_0",
2509 | "content_preview": "System initialized successfully with all providers...",
2510 | "duplicate_count": 3,
2511 | "memory_ids": ["mem_123", "mem_456", "mem_789"],
2512 | "primary_memory_id": "mem_123",
2513 | "memory_details": [
2514 | {
2515 | "memory_id": "mem_123",
2516 | "workflow_id": "workflow_001",
2517 | "memory_type": "system",
2518 | "importance": 8.0,
2519 | "created_at": 1703980800.0
2520 | }
2521 | ],
2522 | "first_created": 1703980800.0,
2523 | "last_created": 1703984400.0,
2524 | "avg_importance": 7.5,
2525 | "recommendation": "merge"
2526 | }
2527 | ],
2528 | "duplicate_groups": [],
2529 | "total_groups": 1,
2530 | "total_duplicates": 3
2531 | }
2532 | }
2533 | }
2534 | },
2535 | 500: {"description": "Internal server error"}
2536 | }
2537 | )
2538 | async def get_duplicate_memories() -> DuplicatesResponse:
2539 | """Get detailed duplicate memory analysis"""
2540 | try:
2541 | conn = get_db_connection()
2542 | cursor = conn.cursor()
2543 |
2544 | cursor.execute("""
2545 | SELECT content, COUNT(*) as count, GROUP_CONCAT(memory_id) as memory_ids,
2546 | MIN(created_at) as first_created, MAX(created_at) as last_created,
2547 | AVG(importance) as avg_importance
2548 | FROM memories
2549 | WHERE content IS NOT NULL AND LENGTH(content) > 10
2550 | GROUP BY content
2551 | HAVING count > 1
2552 | ORDER BY count DESC
2553 | """)
2554 |
2555 | duplicate_groups = []
2556 | for i, row in enumerate(cursor.fetchall()):
2557 | memory_ids = row[2].split(',')
2558 |
2559 | # Get detailed info for each memory in the group
2560 | memory_details = []
2561 | for memory_id in memory_ids:
2562 | cursor.execute("""
2563 | SELECT memory_id, workflow_id, memory_type, importance, created_at
2564 | FROM memories WHERE memory_id = ?
2565 | """, (memory_id,))
2566 |
2567 | detail = cursor.fetchone()
2568 | if detail:
2569 | memory_details.append(MemoryDetail(
2570 | memory_id=detail[0],
2571 | workflow_id=detail[1],
2572 | memory_type=detail[2],
2573 | importance=detail[3],
2574 | created_at=detail[4]
2575 | ))
2576 |
2577 | duplicate_group = DuplicateGroup(
2578 | cluster_id=f"dup_cluster_{i}",
2579 | content_preview=row[0][:200] + '...' if len(row[0]) > 200 else row[0],
2580 | duplicate_count=row[1],
2581 | memory_ids=memory_ids,
2582 | primary_memory_id=memory_ids[0] if memory_ids else "",
2583 | memory_details=memory_details,
2584 | first_created=row[3],
2585 | last_created=row[4],
2586 | avg_importance=round(row[5], 1) if row[5] else 0.0,
2587 | recommendation='merge' if row[1] > 2 else 'review'
2588 | )
2589 | duplicate_groups.append(duplicate_group)
2590 |
2591 | conn.close()
2592 |
2593 | total_duplicates = sum(group.duplicate_count for group in duplicate_groups)
2594 |
2595 | return DuplicatesResponse(
2596 | success=True,
2597 | clusters=duplicate_groups,
2598 | duplicate_groups=duplicate_groups, # For backward compatibility
2599 | total_groups=len(duplicate_groups),
2600 | total_duplicates=total_duplicates
2601 | )
2602 |
2603 | except sqlite3.Error as e:
2604 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
2605 | except Exception as e:
2606 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
2607 |
2608 | @app.get(
2609 | "/memory-quality/orphaned",
2610 | response_model=OrphanedMemoriesResponse,
2611 | tags=["Memory Quality"],
2612 | summary="Get orphaned memories not associated with workflows",
2613 | description="""
2614 | Retrieve memories that are not associated with any workflow:
2615 |
2616 | - **Orphaned memory details** including content and metadata
2617 | - **Creation timestamps** for temporal analysis
2618 | - **Importance scoring** to prioritize action
2619 | - **Assignment recommendations** for workflow integration
2620 |
2621 | Critical for maintaining memory system organization and preventing data loss.
2622 | """,
2623 | responses={
2624 | 200: {
2625 | "description": "Orphaned memories successfully retrieved",
2626 | "content": {
2627 | "application/json": {
2628 | "example": {
2629 | "success": True,
2630 | "orphaned_memories": [
2631 | {
2632 | "memory_id": "mem_999",
2633 | "content": "Important insight that got disconnected from workflow",
2634 | "memory_type": "analysis",
2635 | "importance": 7.5,
2636 | "created_at": 1703980800.0
2637 | }
2638 | ],
2639 | "total_orphaned": 1,
2640 | "recommendation": "Assign to appropriate workflows or archive if no longer needed"
2641 | }
2642 | }
2643 | }
2644 | },
2645 | 500: {"description": "Internal server error"}
2646 | }
2647 | )
2648 | async def get_orphaned_memories() -> OrphanedMemoriesResponse:
2649 | """Get orphaned memories (not associated with workflows)"""
2650 | try:
2651 | conn = get_db_connection()
2652 | cursor = conn.cursor()
2653 |
2654 | cursor.execute("""
2655 | SELECT m.memory_id, m.content, m.memory_type, m.importance, m.created_at
2656 | FROM memories m
2657 | LEFT JOIN workflows w ON m.workflow_id = w.workflow_id
2658 | WHERE w.workflow_id IS NULL
2659 | ORDER BY m.created_at DESC
2660 | """)
2661 |
2662 | orphaned_memories = [
2663 | OrphanedMemory(
2664 | memory_id=row[0],
2665 | content=row[1],
2666 | memory_type=row[2],
2667 | importance=row[3],
2668 | created_at=row[4]
2669 | )
2670 | for row in cursor.fetchall()
2671 | ]
2672 |
2673 | conn.close()
2674 |
2675 | return OrphanedMemoriesResponse(
2676 | success=True,
2677 | orphaned_memories=orphaned_memories,
2678 | total_orphaned=len(orphaned_memories),
2679 | recommendation='Assign to appropriate workflows or archive if no longer needed'
2680 | )
2681 |
2682 | except sqlite3.Error as e:
2683 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
2684 | except Exception as e:
2685 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
2686 |
2687 | @app.post(
2688 | "/memory-quality/bulk-execute",
2689 | response_model=BulkOperationResponse,
2690 | tags=["Memory Quality"],
2691 | summary="Execute bulk operations on memories",
2692 | description="""
2693 | Perform bulk operations on multiple memories:
2694 |
2695 | - **Merge operations** for duplicate consolidation
2696 | - **Archive operations** for stale memory management
2697 | - **Delete operations** for cleanup
2698 | - **Progress tracking** and error reporting
2699 |
2700 | Enables efficient bulk management of memory quality issues.
2701 | """,
2702 | responses={
2703 | 200: {
2704 | "description": "Bulk operation completed",
2705 | "content": {
2706 | "application/json": {
2707 | "example": {
2708 | "success": True,
2709 | "operation_type": "merge",
2710 | "memory_ids": ["mem_456", "mem_789"],
2711 | "success_count": 2,
2712 | "error_count": 0,
2713 | "message": "Operation completed: 2 succeeded, 0 failed",
2714 | "errors": [],
2715 | "merged_into": "mem_123"
2716 | }
2717 | }
2718 | }
2719 | },
2720 | 400: {"description": "Invalid request parameters"},
2721 | 500: {"description": "Internal server error"}
2722 | }
2723 | )
2724 | async def execute_bulk_memory_operations(
2725 | bulk_request: BulkOperationRequest
2726 | ) -> BulkOperationResponse:
2727 | """Execute bulk operations on memories"""
2728 | if not bulk_request.memory_ids:
2729 | raise HTTPException(status_code=400, detail="No memory IDs provided")
2730 |
2731 | try:
2732 | conn = get_db_connection()
2733 | cursor = conn.cursor()
2734 |
2735 | success_count = 0
2736 | errors = []
2737 |
2738 | placeholders = ','.join(['?' for _ in bulk_request.memory_ids])
2739 |
2740 | if bulk_request.operation_type == 'delete':
2741 | try:
2742 | cursor.execute(f"DELETE FROM memories WHERE memory_id IN ({placeholders})", bulk_request.memory_ids)
2743 | success_count = cursor.rowcount
2744 | except Exception as e:
2745 | errors.append(str(e))
2746 |
2747 | elif bulk_request.operation_type == 'archive':
2748 | # Add metadata to mark as archived
2749 | try:
2750 | cursor.execute(f"""
2751 | UPDATE memories
2752 | SET metadata = json_set(COALESCE(metadata, '{{}}'), '$.archived', 'true', '$.archived_at', ?)
2753 | WHERE memory_id IN ({placeholders})
2754 | """, [datetime.now().isoformat()] + bulk_request.memory_ids)
2755 | success_count = cursor.rowcount
2756 | except Exception as e:
2757 | errors.append(str(e))
2758 |
2759 | elif bulk_request.operation_type == 'merge':
2760 | # For merge operations, keep the first memory and delete others
2761 | if len(bulk_request.memory_ids) > 1:
2762 | try:
2763 | # Keep the first memory, delete the rest
2764 | target_id = bulk_request.target_memory_id or bulk_request.memory_ids[0]
2765 | memories_to_delete = [mid for mid in bulk_request.memory_ids if mid != target_id]
2766 |
2767 | if memories_to_delete:
2768 | cursor.execute(
2769 | f"DELETE FROM memories WHERE memory_id IN ({','.join(['?' for _ in memories_to_delete])})",
2770 | memories_to_delete
2771 | )
2772 | success_count = len(memories_to_delete)
2773 | except Exception as e:
2774 | errors.append(str(e))
2775 |
2776 | # Commit changes
2777 | conn.commit()
2778 | conn.close()
2779 |
2780 | error_count = len(bulk_request.memory_ids) - success_count
2781 |
2782 | return BulkOperationResponse(
2783 | success=len(errors) == 0,
2784 | operation_type=bulk_request.operation_type,
2785 | memory_ids=bulk_request.memory_ids,
2786 | success_count=success_count,
2787 | error_count=error_count,
2788 | message=f"Operation completed: {success_count} succeeded, {error_count} failed",
2789 | errors=errors,
2790 | merged_into=bulk_request.target_memory_id if bulk_request.operation_type == 'merge' else None
2791 | )
2792 |
2793 | except sqlite3.Error as e:
2794 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
2795 | except Exception as e:
2796 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
2797 |
2798 | @app.post(
2799 | "/memory-quality/bulk-preview",
2800 | response_model=BulkPreviewResponse,
2801 | tags=["Memory Quality"],
2802 | summary="Preview bulk operations before execution",
2803 | description="""
2804 | Preview the effects of bulk operations before executing them:
2805 |
2806 | - **Operation impact preview** with affected memories
2807 | - **Risk assessment** for destructive operations
2808 | - **Merge target selection** for duplicate operations
2809 | - **Cost estimation** for large operations
2810 |
2811 | Essential for safe bulk operations and preventing accidental data loss.
2812 | """,
2813 | responses={
2814 | 200: {
2815 | "description": "Preview generated successfully",
2816 | "content": {
2817 | "application/json": {
2818 | "example": {
2819 | "success": True,
2820 | "operation": {
2821 | "operation_type": "merge",
2822 | "total_affected": 3,
2823 | "preview_description": "This will merge 3 memories",
2824 | "affected_memories": [
2825 | {
2826 | "memory_id": "mem_123",
2827 | "content": "System initialized successfully",
2828 | "memory_type": "system",
2829 | "importance": 8.0,
2830 | "workflow_id": "workflow_001"
2831 | }
2832 | ],
2833 | "merge_target": {
2834 | "memory_id": "mem_123",
2835 | "content": "System initialized successfully",
2836 | "memory_type": "system",
2837 | "importance": 8.0,
2838 | "workflow_id": "workflow_001"
2839 | },
2840 | "will_be_deleted": []
2841 | }
2842 | }
2843 | }
2844 | }
2845 | },
2846 | 400: {"description": "Invalid request parameters"},
2847 | 500: {"description": "Internal server error"}
2848 | }
2849 | )
2850 | async def preview_bulk_operations(
2851 | bulk_request: BulkOperationRequest
2852 | ) -> BulkPreviewResponse:
2853 | """Preview bulk operations before execution"""
2854 | try:
2855 | conn = get_db_connection()
2856 | cursor = conn.cursor()
2857 |
2858 | # Get memory details for preview
2859 | placeholders = ','.join(['?' for _ in bulk_request.memory_ids])
2860 | cursor.execute(f"""
2861 | SELECT memory_id, content, memory_type, importance, workflow_id
2862 | FROM memories
2863 | WHERE memory_id IN ({placeholders})
2864 | """, bulk_request.memory_ids)
2865 |
2866 | memories = [
2867 | PreviewMemory(
2868 | memory_id=row[0],
2869 | content=row[1],
2870 | memory_type=row[2],
2871 | importance=row[3],
2872 | workflow_id=row[4]
2873 | )
2874 | for row in cursor.fetchall()
2875 | ]
2876 |
2877 | preview = BulkOperationPreview(
2878 | operation_type=bulk_request.operation_type,
2879 | total_affected=len(memories),
2880 | preview_description=f'This will {bulk_request.operation_type} {len(memories)} memories',
2881 | affected_memories=memories
2882 | )
2883 |
2884 | if bulk_request.operation_type == 'merge' and len(memories) > 1:
2885 | target_id = bulk_request.target_memory_id or memories[0].memory_id
2886 | preview.merge_target = next((m for m in memories if m.memory_id == target_id), memories[0])
2887 | preview.will_be_deleted = [m for m in memories if m.memory_id != target_id]
2888 |
2889 | conn.close()
2890 |
2891 | return BulkPreviewResponse(
2892 | success=True,
2893 | operation=preview
2894 | )
2895 |
2896 | except sqlite3.Error as e:
2897 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
2898 | except Exception as e:
2899 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
2900 |
2901 | # ---------- Working Memory System Implementation ----------
2902 |
2903 | from collections import defaultdict, deque
2904 | from threading import Lock
2905 |
2906 | from fastapi import Body
2907 |
2908 | # Global working memory instance
2909 | _working_memory_system = None
2910 | _working_memory_lock = Lock()
2911 |
2912 | class WorkingMemorySystem:
2913 | """
2914 | Working memory system for managing active memories with focus capabilities.
2915 |
2916 | This system maintains a pool of recent memories with relevance scoring
2917 | and focus mode for filtering based on keywords or patterns.
2918 | """
2919 |
2920 | def __init__(self, capacity: int = 100, focus_threshold: float = 0.7):
2921 | self.capacity = capacity
2922 | self.focus_threshold = focus_threshold
2923 | self.memory_pool = deque(maxlen=capacity)
2924 | self.focus_mode_enabled = False
2925 | self.focus_keywords = []
2926 | self.memory_index = {} # memory_id -> memory mapping
2927 | self.category_index = defaultdict(list) # category -> [memory_ids]
2928 | self.access_counts = defaultdict(int) # memory_id -> access count
2929 | self.relevance_scores = {} # memory_id -> relevance score
2930 | self.initialized_at = datetime.now()
2931 | self.last_optimization = datetime.now()
2932 | self.optimization_count = 0
2933 |
2934 | def add_memory(self, memory_id: str, content: str, category: str, importance: float = 5.0):
2935 | """Add a memory to the working pool"""
2936 | memory = {
2937 | 'memory_id': memory_id,
2938 | 'content': content,
2939 | 'category': category,
2940 | 'importance': importance,
2941 | 'added_at': datetime.now().timestamp(),
2942 | 'last_accessed': datetime.now().timestamp()
2943 | }
2944 |
2945 | # Remove old memory if exists
2946 | if memory_id in self.memory_index:
2947 | self.remove_memory(memory_id)
2948 |
2949 | # Add to pool
2950 | self.memory_pool.append(memory)
2951 | self.memory_index[memory_id] = memory
2952 | self.category_index[category].append(memory_id)
2953 |
2954 | # Calculate initial relevance
2955 | self._calculate_relevance(memory)
2956 |
2957 | def remove_memory(self, memory_id: str):
2958 | """Remove a memory from the working pool"""
2959 | if memory_id in self.memory_index:
2960 | memory = self.memory_index[memory_id]
2961 | self.memory_pool.remove(memory)
2962 | del self.memory_index[memory_id]
2963 | self.category_index[memory['category']].remove(memory_id)
2964 | if memory_id in self.relevance_scores:
2965 | del self.relevance_scores[memory_id]
2966 | if memory_id in self.access_counts:
2967 | del self.access_counts[memory_id]
2968 |
2969 | def access_memory(self, memory_id: str):
2970 | """Record memory access and update relevance"""
2971 | if memory_id in self.memory_index:
2972 | self.access_counts[memory_id] += 1
2973 | self.memory_index[memory_id]['last_accessed'] = datetime.now().timestamp()
2974 | self._calculate_relevance(self.memory_index[memory_id])
2975 |
2976 | def set_focus_mode(self, enabled: bool, keywords: List[str] = None):
2977 | """Enable or disable focus mode with optional keywords"""
2978 | self.focus_mode_enabled = enabled
2979 | self.focus_keywords = keywords or []
2980 |
2981 | # Recalculate relevance for all memories
2982 | for memory in self.memory_pool:
2983 | self._calculate_relevance(memory)
2984 |
2985 | def _calculate_relevance(self, memory: dict):
2986 | """Calculate relevance score for a memory"""
2987 | base_score = memory['importance'] / 10.0 # Normalize to 0-1
2988 |
2989 | # Recency factor
2990 | age_hours = (datetime.now().timestamp() - memory['added_at']) / 3600
2991 | recency_factor = max(0.1, 1.0 - (age_hours / 24)) # Decay over 24 hours
2992 |
2993 | # Access frequency factor
2994 | access_factor = min(1.0, self.access_counts[memory['memory_id']] / 10.0)
2995 |
2996 | # Focus mode factor
2997 | focus_factor = 1.0
2998 | if self.focus_mode_enabled and self.focus_keywords:
2999 | content_lower = memory['content'].lower()
3000 | keyword_matches = sum(1 for kw in self.focus_keywords if kw.lower() in content_lower)
3001 | focus_factor = min(2.0, 1.0 + (keyword_matches * 0.5))
3002 |
3003 | # Calculate final score
3004 | relevance = base_score * recency_factor * (0.5 + 0.5 * access_factor) * focus_factor
3005 | self.relevance_scores[memory['memory_id']] = min(1.0, relevance)
3006 |
3007 | def get_active_memories(self, limit: int = None) -> List[dict]:
3008 | """Get active memories sorted by relevance"""
3009 | memories = list(self.memory_pool)
3010 |
3011 | # Filter by focus threshold if in focus mode
3012 | if self.focus_mode_enabled:
3013 | memories = [m for m in memories if self.relevance_scores.get(m['memory_id'], 0) >= self.focus_threshold]
3014 |
3015 | # Sort by relevance
3016 | memories.sort(key=lambda m: self.relevance_scores.get(m['memory_id'], 0), reverse=True)
3017 |
3018 | if limit:
3019 | memories = memories[:limit]
3020 |
3021 | return memories
3022 |
3023 | def get_statistics(self) -> dict:
3024 | """Get working memory statistics"""
3025 | active_memories = self.get_active_memories()
3026 |
3027 | # Category distribution
3028 | category_dist = {}
3029 | for category, memory_ids in self.category_index.items():
3030 | category_dist[category] = len(memory_ids)
3031 |
3032 | # Calculate average relevance
3033 | relevance_values = list(self.relevance_scores.values())
3034 | avg_relevance = sum(relevance_values) / len(relevance_values) if relevance_values else 0
3035 |
3036 | return {
3037 | 'total_memories': len(self.memory_pool),
3038 | 'active_memories': len(active_memories),
3039 | 'capacity_used': len(self.memory_pool) / self.capacity * 100,
3040 | 'avg_relevance_score': avg_relevance,
3041 | 'category_distribution': category_dist,
3042 | 'total_accesses': sum(self.access_counts.values()),
3043 | 'optimization_suggestions': self._get_optimization_suggestions()
3044 | }
3045 |
3046 | def _get_optimization_suggestions(self) -> int:
3047 | """Count optimization suggestions"""
3048 | suggestions = 0
3049 |
3050 | # Check for low relevance memories
3051 | low_relevance = sum(1 for score in self.relevance_scores.values() if score < 0.3)
3052 | if low_relevance > self.capacity * 0.2: # More than 20% low relevance
3053 | suggestions += 1
3054 |
3055 | # Check for stale memories
3056 | now = datetime.now().timestamp()
3057 | stale_memories = sum(1 for m in self.memory_pool if (now - m['last_accessed']) > 3600) # 1 hour
3058 | if stale_memories > self.capacity * 0.3: # More than 30% stale
3059 | suggestions += 1
3060 |
3061 | # Check for unbalanced categories
3062 | if self.category_index:
3063 | sizes = [len(ids) for ids in self.category_index.values()]
3064 | if max(sizes) > sum(sizes) * 0.5: # One category has more than 50%
3065 | suggestions += 1
3066 |
3067 | return suggestions
3068 |
3069 | def optimize(self):
3070 | """Optimize working memory by removing low-relevance memories"""
3071 | # Remove memories below threshold
3072 | to_remove = [
3073 | m['memory_id'] for m in self.memory_pool
3074 | if self.relevance_scores.get(m['memory_id'], 0) < 0.2
3075 | ]
3076 |
3077 | for memory_id in to_remove:
3078 | self.remove_memory(memory_id)
3079 |
3080 | self.last_optimization = datetime.now()
3081 | self.optimization_count += 1
3082 |
3083 | return len(to_remove)
3084 |
3085 | def get_working_memory_system() -> WorkingMemorySystem:
3086 | """Get or create the global working memory system instance"""
3087 | global _working_memory_system
3088 |
3089 | with _working_memory_lock:
3090 | if _working_memory_system is None:
3091 | _working_memory_system = WorkingMemorySystem()
3092 | return _working_memory_system
3093 |
3094 | # ---------- Working Memory Pydantic Models ----------
3095 |
3096 | class FocusMode(BaseModel):
3097 | """Focus mode configuration"""
3098 | enabled: bool = Field(..., description="Whether focus mode is enabled")
3099 | focus_keywords: List[str] = Field(default_factory=list, description="Keywords for focus filtering")
3100 |
3101 | class PerformanceMetrics(BaseModel):
3102 | """Working memory performance metrics"""
3103 | avg_relevance_score: float = Field(..., description="Average relevance score across all memories")
3104 | optimization_suggestions: int = Field(..., description="Number of optimization suggestions")
3105 |
3106 | class WorkingMemoryStatus(BaseModel):
3107 | """Complete working memory system status"""
3108 | initialized: bool = Field(..., description="Whether the system is initialized")
3109 | total_capacity: int = Field(..., description="Maximum memory capacity")
3110 | current_size: int = Field(..., description="Current number of memories in pool")
3111 | utilization_percentage: float = Field(..., description="Percentage of capacity used")
3112 | focus_mode: FocusMode = Field(..., description="Focus mode configuration")
3113 | performance_metrics: PerformanceMetrics = Field(..., description="Performance metrics")
3114 | category_distribution: Dict[str, int] = Field(default_factory=dict, description="Memory count by category")
3115 | last_optimization: str = Field(..., description="ISO timestamp of last optimization")
3116 | optimization_count: int = Field(..., description="Total number of optimizations performed")
3117 |
3118 | class InitializeRequest(BaseModel):
3119 | """Request model for initializing working memory"""
3120 | capacity: int = Field(
3121 | 100,
3122 | ge=10,
3123 | le=1000,
3124 | description="Maximum number of memories in working pool"
3125 | )
3126 | focus_threshold: float = Field(
3127 | 0.7,
3128 | ge=0.0,
3129 | le=1.0,
3130 | description="Relevance threshold for focus mode"
3131 | )
3132 |
3133 | class InitializeResponse(BaseModel):
3134 | """Response model for initialization"""
3135 | success: bool = Field(..., description="Whether initialization was successful")
3136 | message: str = Field(..., description="Status message")
3137 | configuration: Dict[str, Any] = Field(..., description="Applied configuration")
3138 |
3139 | class MemoryItem(BaseModel):
3140 | """Model for a memory in the working pool"""
3141 | memory_id: str = Field(..., description="Unique memory identifier")
3142 | content: str = Field(..., description="Memory content")
3143 | category: str = Field(..., description="Memory category")
3144 | importance: float = Field(..., description="Importance score (0-10)")
3145 | relevance_score: float = Field(..., description="Current relevance score (0-1)")
3146 | added_at: float = Field(..., description="Timestamp when added to working memory")
3147 | last_accessed: float = Field(..., description="Timestamp of last access")
3148 | access_count: int = Field(..., description="Number of times accessed")
3149 |
3150 | class ActiveMemoriesResponse(BaseModel):
3151 | """Response for active memories query"""
3152 | memories: List[MemoryItem] = Field(..., description="List of active memories sorted by relevance")
3153 | total_count: int = Field(..., description="Total number of memories matching criteria")
3154 | focus_active: bool = Field(..., description="Whether focus mode filtering is active")
3155 |
3156 | class SetFocusModeRequest(BaseModel):
3157 | """Request to set focus mode"""
3158 | enabled: bool = Field(..., description="Enable or disable focus mode")
3159 | keywords: List[str] = Field(default_factory=list, description="Keywords for focus filtering", max_items=20)
3160 |
3161 | class OptimizeResponse(BaseModel):
3162 | """Response for optimization operation"""
3163 | success: bool = Field(..., description="Whether optimization was successful")
3164 | removed_count: int = Field(..., description="Number of memories removed")
3165 | message: str = Field(..., description="Optimization result message")
3166 |
3167 | # ---------- Working Memory Endpoints ----------
3168 |
3169 | @app.get(
3170 | "/working-memory/status",
3171 | response_model=WorkingMemoryStatus,
3172 | tags=["Working Memory"],
3173 | summary="Get working memory system status",
3174 | description="""
3175 | Retrieve the current status and configuration of the working memory system:
3176 |
3177 | - **Pool utilization** and capacity metrics
3178 | - **Focus mode** status and configuration
3179 | - **Optimization statistics** and performance data
3180 | - **Memory distribution** across different categories
3181 |
3182 | Essential for monitoring working memory health and performance optimization.
3183 | """,
3184 | responses={
3185 | 200: {
3186 | "description": "Working memory status retrieved successfully",
3187 | "content": {
3188 | "application/json": {
3189 | "example": {
3190 | "initialized": True,
3191 | "total_capacity": 100,
3192 | "current_size": 45,
3193 | "utilization_percentage": 45.0,
3194 | "focus_mode": {
3195 | "enabled": True,
3196 | "focus_keywords": ["document", "analysis", "pdf"]
3197 | },
3198 | "performance_metrics": {
3199 | "avg_relevance_score": 0.72,
3200 | "optimization_suggestions": 2
3201 | },
3202 | "category_distribution": {
3203 | "reasoning": 15,
3204 | "observation": 20,
3205 | "decision": 10
3206 | },
3207 | "last_optimization": "2024-01-01T12:30:00",
3208 | "optimization_count": 5
3209 | }
3210 | }
3211 | }
3212 | },
3213 | 500: {
3214 | "description": "Internal server error",
3215 | "content": {
3216 | "application/json": {
3217 | "example": {"detail": "Failed to retrieve working memory status"}
3218 | }
3219 | }
3220 | }
3221 | }
3222 | )
3223 | async def get_working_memory_status() -> WorkingMemoryStatus:
3224 | """Get working memory system status"""
3225 | try:
3226 | wm_system = get_working_memory_system()
3227 | stats = wm_system.get_statistics()
3228 |
3229 | return WorkingMemoryStatus(
3230 | initialized=True,
3231 | total_capacity=wm_system.capacity,
3232 | current_size=stats['total_memories'],
3233 | utilization_percentage=stats['capacity_used'],
3234 | focus_mode=FocusMode(
3235 | enabled=wm_system.focus_mode_enabled,
3236 | focus_keywords=wm_system.focus_keywords
3237 | ),
3238 | performance_metrics=PerformanceMetrics(
3239 | avg_relevance_score=stats['avg_relevance_score'],
3240 | optimization_suggestions=stats['optimization_suggestions']
3241 | ),
3242 | category_distribution=stats['category_distribution'],
3243 | last_optimization=wm_system.last_optimization.isoformat(),
3244 | optimization_count=wm_system.optimization_count
3245 | )
3246 |
3247 | except Exception as e:
3248 | raise HTTPException(status_code=500, detail=f"Failed to retrieve working memory status: {str(e)}") from e
3249 |
3250 | @app.post(
3251 | "/working-memory/initialize",
3252 | response_model=InitializeResponse,
3253 | tags=["Working Memory"],
3254 | summary="Initialize working memory system",
3255 | description="""
3256 | Initialize or reinitialize the working memory system with specific configuration:
3257 |
3258 | - **System initialization** with capacity settings
3259 | - **Configuration setup** for optimization parameters
3260 | - **Pool preparation** for memory operations
3261 | - **Performance tuning** based on usage patterns
3262 |
3263 | Required before other working memory operations can be performed effectively.
3264 | """,
3265 | responses={
3266 | 200: {
3267 | "description": "Working memory initialized successfully",
3268 | "content": {
3269 | "application/json": {
3270 | "example": {
3271 | "success": True,
3272 | "message": "Working memory system initialized with capacity 150",
3273 | "configuration": {
3274 | "capacity": 150,
3275 | "focus_threshold": 0.8,
3276 | "initialized_at": "2024-01-01T12:00:00"
3277 | }
3278 | }
3279 | }
3280 | }
3281 | },
3282 | 400: {
3283 | "description": "Invalid configuration parameters",
3284 | "content": {
3285 | "application/json": {
3286 | "example": {"detail": "Capacity must be between 10 and 1000"}
3287 | }
3288 | }
3289 | },
3290 | 500: {
3291 | "description": "Internal server error"
3292 | }
3293 | }
3294 | )
3295 | async def initialize_working_memory(
3296 | request: InitializeRequest
3297 | ) -> InitializeResponse:
3298 | """Initialize working memory system"""
3299 | try:
3300 | global _working_memory_system
3301 |
3302 | with _working_memory_lock:
3303 | # Create new instance with specified configuration
3304 | _working_memory_system = WorkingMemorySystem(
3305 | capacity=request.capacity,
3306 | focus_threshold=request.focus_threshold
3307 | )
3308 |
3309 | # Optionally load recent memories from database
3310 | try:
3311 | conn = get_db_connection()
3312 | cursor = conn.cursor()
3313 |
3314 | # Load most recent memories up to capacity
3315 | cursor.execute("""
3316 | SELECT memory_id, content, memory_type, importance
3317 | FROM memories
3318 | WHERE created_at >= ?
3319 | ORDER BY importance DESC, created_at DESC
3320 | LIMIT ?
3321 | """, (datetime.now().timestamp() - 86400, request.capacity)) # Last 24 hours
3322 |
3323 | loaded_count = 0
3324 | for row in cursor.fetchall():
3325 | _working_memory_system.add_memory(
3326 | memory_id=row[0],
3327 | content=row[1],
3328 | category=row[2],
3329 | importance=row[3]
3330 | )
3331 | loaded_count += 1
3332 |
3333 | conn.close()
3334 |
3335 | message = f"Working memory system initialized with capacity {request.capacity}, loaded {loaded_count} recent memories"
3336 | except Exception as e:
3337 | # Continue even if loading fails
3338 | message = f"Working memory system initialized with capacity {request.capacity} (memory loading failed: {str(e)})"
3339 |
3340 | return InitializeResponse(
3341 | success=True,
3342 | message=message,
3343 | configuration={
3344 | "capacity": request.capacity,
3345 | "focus_threshold": request.focus_threshold,
3346 | "initialized_at": _working_memory_system.initialized_at.isoformat()
3347 | }
3348 | )
3349 |
3350 | except Exception as e:
3351 | raise HTTPException(status_code=500, detail=f"Failed to initialize working memory: {str(e)}") from e
3352 |
3353 | @app.get(
3354 | "/working-memory/active",
3355 | response_model=ActiveMemoriesResponse,
3356 | tags=["Working Memory"],
3357 | summary="Get active memories from working pool",
3358 | description="""
3359 | Retrieve active memories from the working pool, sorted by relevance.
3360 |
3361 | When focus mode is enabled, only memories meeting the relevance threshold are returned.
3362 | This endpoint is useful for understanding what memories are currently available for processing.
3363 | """,
3364 | responses={
3365 | 200: {
3366 | "description": "Active memories retrieved successfully",
3367 | "content": {
3368 | "application/json": {
3369 | "example": {
3370 | "memories": [
3371 | {
3372 | "memory_id": "mem_001",
3373 | "content": "The PDF contains financial data from Q4 2023",
3374 | "category": "observation",
3375 | "importance": 8.5,
3376 | "relevance_score": 0.92,
3377 | "added_at": 1703980800.0,
3378 | "last_accessed": 1703981400.0,
3379 | "access_count": 5
3380 | }
3381 | ],
3382 | "total_count": 1,
3383 | "focus_active": True
3384 | }
3385 | }
3386 | }
3387 | }
3388 | }
3389 | )
3390 | async def get_active_memories(
3391 | limit: int = Query(50, ge=1, le=200, description="Maximum number of memories to return"),
3392 | category: Optional[str] = Query(None, description="Filter by memory category")
3393 | ) -> ActiveMemoriesResponse:
3394 | """Get active memories from working pool"""
3395 | try:
3396 | wm_system = get_working_memory_system()
3397 | memories = wm_system.get_active_memories(limit=limit)
3398 |
3399 | # Filter by category if specified
3400 | if category:
3401 | memories = [m for m in memories if m['category'] == category]
3402 |
3403 | # Convert to response format
3404 | memory_items = []
3405 | for mem in memories:
3406 | memory_items.append(MemoryItem(
3407 | memory_id=mem['memory_id'],
3408 | content=mem['content'],
3409 | category=mem['category'],
3410 | importance=mem['importance'],
3411 | relevance_score=wm_system.relevance_scores.get(mem['memory_id'], 0),
3412 | added_at=mem['added_at'],
3413 | last_accessed=mem['last_accessed'],
3414 | access_count=wm_system.access_counts.get(mem['memory_id'], 0)
3415 | ))
3416 |
3417 | return ActiveMemoriesResponse(
3418 | memories=memory_items,
3419 | total_count=len(memory_items),
3420 | focus_active=wm_system.focus_mode_enabled
3421 | )
3422 |
3423 | except Exception as e:
3424 | raise HTTPException(status_code=500, detail=f"Failed to retrieve active memories: {str(e)}") from e
3425 |
3426 | @app.post(
3427 | "/working-memory/focus",
3428 | response_model=InitializeResponse,
3429 | tags=["Working Memory"],
3430 | summary="Set focus mode configuration",
3431 | description="""
3432 | Configure focus mode for the working memory system.
3433 |
3434 | Focus mode filters memories based on relevance to specified keywords,
3435 | helping to narrow attention to specific topics or contexts.
3436 | """
3437 | )
3438 | async def set_focus_mode(
3439 | request: SetFocusModeRequest
3440 | ) -> InitializeResponse:
3441 | """Set focus mode configuration"""
3442 | try:
3443 | wm_system = get_working_memory_system()
3444 | wm_system.set_focus_mode(request.enabled, request.keywords)
3445 |
3446 | message = f"Focus mode {'enabled' if request.enabled else 'disabled'}"
3447 | if request.enabled and request.keywords:
3448 | message += f" with keywords: {', '.join(request.keywords)}"
3449 |
3450 | return InitializeResponse(
3451 | success=True,
3452 | message=message,
3453 | configuration={
3454 | "focus_enabled": request.enabled,
3455 | "focus_keywords": request.keywords,
3456 | "focus_threshold": wm_system.focus_threshold
3457 | }
3458 | )
3459 |
3460 | except Exception as e:
3461 | raise HTTPException(status_code=500, detail=f"Failed to set focus mode: {str(e)}") from e
3462 |
3463 | @app.post(
3464 | "/working-memory/optimize",
3465 | response_model=OptimizeResponse,
3466 | tags=["Working Memory"],
3467 | summary="Optimize working memory pool",
3468 | description="""
3469 | Optimize the working memory pool by removing low-relevance memories.
3470 |
3471 | This operation helps maintain memory pool quality by removing memories
3472 | with relevance scores below the optimization threshold (0.2).
3473 | """
3474 | )
3475 | async def optimize_working_memory() -> OptimizeResponse:
3476 | """Optimize working memory pool"""
3477 | try:
3478 | wm_system = get_working_memory_system()
3479 | removed_count = wm_system.optimize()
3480 |
3481 | return OptimizeResponse(
3482 | success=True,
3483 | removed_count=removed_count,
3484 | message=f"Optimization complete. Removed {removed_count} low-relevance memories."
3485 | )
3486 |
3487 | except Exception as e:
3488 | raise HTTPException(status_code=500, detail=f"Failed to optimize working memory: {str(e)}") from e
3489 |
3490 | # ---------- Performance Profiler Pydantic Models ----------
3491 |
3492 | class PerformanceOverviewStats(BaseModel):
3493 | """Overall performance statistics"""
3494 | total_actions: int = Field(..., description="Total number of actions executed")
3495 | active_workflows: int = Field(..., description="Number of unique workflows")
3496 | avg_execution_time: float = Field(..., description="Average execution time in seconds")
3497 | min_execution_time: Optional[float] = Field(None, description="Minimum execution time")
3498 | max_execution_time: Optional[float] = Field(None, description="Maximum execution time")
3499 | successful_actions: int = Field(..., description="Number of successful actions")
3500 | failed_actions: int = Field(..., description="Number of failed actions")
3501 | tools_used: int = Field(..., description="Number of distinct tools used")
3502 | success_rate_percentage: float = Field(..., description="Success rate as percentage")
3503 | throughput_per_hour: float = Field(..., description="Actions processed per hour")
3504 | error_rate_percentage: float = Field(..., description="Error rate as percentage")
3505 | avg_workflow_size: float = Field(..., description="Average actions per workflow")
3506 |
3507 | class TimelineBucket(BaseModel):
3508 | """Performance metrics for a time bucket"""
3509 | time_bucket: str = Field(..., description="Time bucket identifier")
3510 | action_count: int = Field(..., description="Number of actions in this bucket")
3511 | avg_duration: Optional[float] = Field(None, description="Average duration in seconds")
3512 | successful_count: int = Field(..., description="Number of successful actions")
3513 | failed_count: int = Field(..., description="Number of failed actions")
3514 | workflow_count: int = Field(..., description="Number of unique workflows")
3515 |
3516 | class ToolUtilization(BaseModel):
3517 | """Tool utilization metrics"""
3518 | tool_name: str = Field(..., description="Name of the tool")
3519 | usage_count: int = Field(..., description="Number of times used")
3520 | avg_duration: Optional[float] = Field(None, description="Average execution duration")
3521 | success_count: int = Field(..., description="Number of successful executions")
3522 | max_duration: Optional[float] = Field(None, description="Maximum execution duration")
3523 |
3524 | class Bottleneck(BaseModel):
3525 | """Performance bottleneck information"""
3526 | tool_name: str = Field(..., description="Tool causing the bottleneck")
3527 | workflow_id: Optional[str] = Field(None, description="Associated workflow")
3528 | action_id: str = Field(..., description="Action identifier")
3529 | started_at: float = Field(..., description="Start timestamp")
3530 | completed_at: Optional[float] = Field(None, description="Completion timestamp")
3531 | duration: float = Field(..., description="Duration in seconds")
3532 | status: str = Field(..., description="Action status")
3533 | reasoning: Optional[str] = Field(None, description="Action reasoning")
3534 |
3535 | class PerformanceOverviewResponse(BaseModel):
3536 | """Response model for performance overview"""
3537 | overview: PerformanceOverviewStats
3538 | timeline: List[TimelineBucket]
3539 | tool_utilization: List[ToolUtilization]
3540 | bottlenecks: List[Bottleneck]
3541 | analysis_period: Dict[str, Any] = Field(..., description="Analysis period information")
3542 | timestamp: str = Field(..., description="Response generation timestamp")
3543 |
3544 | class ToolBottleneck(BaseModel):
3545 | """Tool performance bottleneck analysis"""
3546 | tool_name: str = Field(..., description="Name of the tool")
3547 | total_calls: int = Field(..., description="Total number of calls")
3548 | avg_duration: float = Field(..., description="Average execution duration")
3549 | max_duration: float = Field(..., description="Maximum execution duration")
3550 | min_duration: float = Field(..., description="Minimum execution duration")
3551 | p95_duration: float = Field(..., description="95th percentile duration")
3552 | p99_duration: float = Field(..., description="99th percentile duration")
3553 | failure_count: int = Field(..., description="Number of failures")
3554 | total_time_spent: float = Field(..., description="Total time spent in seconds")
3555 |
3556 | class WorkflowBottleneck(BaseModel):
3557 | """Workflow performance bottleneck"""
3558 | workflow_id: str = Field(..., description="Workflow identifier")
3559 | title: Optional[str] = Field(None, description="Workflow title")
3560 | action_count: int = Field(..., description="Number of actions")
3561 | avg_action_duration: float = Field(..., description="Average action duration")
3562 | max_action_duration: float = Field(..., description="Maximum action duration")
3563 | total_workflow_time: float = Field(..., description="Total workflow execution time")
3564 | workflow_start: float = Field(..., description="Workflow start timestamp")
3565 | workflow_end: float = Field(..., description="Workflow end timestamp")
3566 | total_elapsed_time: float = Field(..., description="Total elapsed wall-clock time")
3567 |
3568 | class ParallelizationOpportunity(BaseModel):
3569 | """Workflow parallelization opportunity"""
3570 | workflow_id: str = Field(..., description="Workflow identifier")
3571 | sequential_actions: int = Field(..., description="Number of sequential actions")
3572 | total_sequential_time: float = Field(..., description="Total sequential execution time")
3573 | actual_elapsed_time: float = Field(..., description="Actual elapsed time")
3574 | potential_time_savings: float = Field(..., description="Potential time savings in seconds")
3575 | parallelization_efficiency: float = Field(..., description="Current parallelization efficiency percentage")
3576 | optimization_score: float = Field(..., description="Optimization potential score (0-10)")
3577 |
3578 | class ResourceContention(BaseModel):
3579 | """Resource contention analysis"""
3580 | tool_name: str = Field(..., description="Tool name")
3581 | concurrent_usage: int = Field(..., description="Number of concurrent usages")
3582 | avg_duration_under_contention: float = Field(..., description="Average duration when contended")
3583 |
3584 | class OptimizationRecommendation(BaseModel):
3585 | """Performance optimization recommendation"""
3586 | type: str = Field(..., description="Type of optimization")
3587 | priority: str = Field(..., description="Priority level (high, medium, low)")
3588 | title: str = Field(..., description="Recommendation title")
3589 | description: str = Field(..., description="Detailed description")
3590 | impact: str = Field(..., description="Expected impact description")
3591 | actions: List[str] = Field(..., description="Recommended actions to take")
3592 |
3593 | class BottleneckAnalysisResponse(BaseModel):
3594 | """Response model for bottleneck analysis"""
3595 | tool_bottlenecks: List[ToolBottleneck]
3596 | workflow_bottlenecks: List[WorkflowBottleneck]
3597 | parallelization_opportunities: List[ParallelizationOpportunity]
3598 | resource_contention: List[ResourceContention]
3599 | recommendations: List[OptimizationRecommendation]
3600 | analysis_summary: Dict[str, Any]
3601 | timestamp: str
3602 |
3603 | # ---------- Performance Profiler Endpoints ----------
3604 |
3605 | @app.get(
3606 | "/performance/overview",
3607 | response_model=PerformanceOverviewResponse,
3608 | tags=["Performance Profiler"],
3609 | summary="Get comprehensive performance overview with metrics and trends",
3610 | description="""
3611 | Retrieve comprehensive workflow performance overview including:
3612 |
3613 | - **Real-time performance metrics** with execution time analysis
3614 | - **Timeline visualization data** with configurable granularity
3615 | - **Tool utilization statistics** and performance breakdowns
3616 | - **Current bottlenecks** identification with severity indicators
3617 | - **Throughput analysis** and success rate metrics
3618 |
3619 | Perfect for monitoring overall system performance and identifying optimization opportunities.
3620 | """,
3621 | responses={
3622 | 200: {
3623 | "description": "Performance overview data with metrics and timeline",
3624 | "content": {
3625 | "application/json": {
3626 | "example": {
3627 | "overview": {
3628 | "total_actions": 1250,
3629 | "active_workflows": 45,
3630 | "avg_execution_time": 12.5,
3631 | "success_rate_percentage": 92.5,
3632 | "throughput_per_hour": 52.1
3633 | },
3634 | "timeline": [
3635 | {
3636 | "time_bucket": "2024-01-01 14:00:00",
3637 | "action_count": 45,
3638 | "avg_duration": 11.2,
3639 | "successful_count": 42,
3640 | "failed_count": 3
3641 | }
3642 | ]
3643 | }
3644 | }
3645 | }
3646 | }
3647 | }
3648 | )
3649 | async def get_performance_overview(
3650 | hours_back: int = Query(
3651 | 24,
3652 | description="Number of hours back to analyze performance data",
3653 | ge=1,
3654 | le=720,
3655 | example=24
3656 | ),
3657 | granularity: str = Query(
3658 | "hour",
3659 | description="Time granularity for timeline data aggregation",
3660 | regex="^(minute|hour|day)$",
3661 | example="hour"
3662 | )
3663 | ) -> PerformanceOverviewResponse:
3664 | """Get comprehensive performance overview with metrics and trends"""
3665 | try:
3666 | conn = get_db_connection()
3667 | cursor = conn.cursor()
3668 |
3669 | since_timestamp = datetime.now().timestamp() - (hours_back * 3600)
3670 |
3671 | # Overall performance metrics
3672 | cursor.execute("""
3673 | SELECT
3674 | COUNT(*) as total_actions,
3675 | COUNT(DISTINCT workflow_id) as active_workflows,
3676 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_execution_time,
3677 | MIN(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as min_execution_time,
3678 | MAX(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as max_execution_time,
3679 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful_actions,
3680 | SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_actions,
3681 | COUNT(DISTINCT tool_name) as tools_used
3682 | FROM actions
3683 | WHERE started_at >= ?
3684 | """, (since_timestamp,))
3685 |
3686 | overview_result = cursor.fetchone()
3687 | overview_data = dict(zip([d[0] for d in cursor.description], overview_result, strict=False)) if overview_result else {}
3688 |
3689 | # Calculate derived metrics
3690 | success_rate = (overview_data.get('successful_actions', 0) / max(1, overview_data.get('total_actions', 1))) * 100
3691 | throughput = overview_data.get('total_actions', 0) / max(1, hours_back)
3692 |
3693 | overview_stats = PerformanceOverviewStats(
3694 | **overview_data,
3695 | success_rate_percentage=success_rate,
3696 | throughput_per_hour=throughput,
3697 | error_rate_percentage=100 - success_rate,
3698 | avg_workflow_size=overview_data.get('total_actions', 0) / max(1, overview_data.get('active_workflows', 1))
3699 | )
3700 |
3701 | # Performance timeline
3702 | if granularity == 'hour':
3703 | time_format = "strftime('%Y-%m-%d %H:00:00', datetime(started_at, 'unixepoch'))"
3704 | elif granularity == 'minute':
3705 | time_format = "strftime('%Y-%m-%d %H:%M:00', datetime(started_at, 'unixepoch'))"
3706 | else: # day
3707 | time_format = "strftime('%Y-%m-%d', datetime(started_at, 'unixepoch'))"
3708 |
3709 | cursor.execute(f"""
3710 | SELECT
3711 | {time_format} as time_bucket,
3712 | COUNT(*) as action_count,
3713 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration,
3714 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful_count,
3715 | SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count,
3716 | COUNT(DISTINCT workflow_id) as workflow_count
3717 | FROM actions
3718 | WHERE started_at >= ?
3719 | GROUP BY {time_format}
3720 | ORDER BY time_bucket
3721 | """, (since_timestamp,))
3722 |
3723 | timeline_data = [
3724 | TimelineBucket(**dict(zip([d[0] for d in cursor.description], row, strict=False)))
3725 | for row in cursor.fetchall()
3726 | ]
3727 |
3728 | # Resource utilization by tool
3729 | cursor.execute("""
3730 | SELECT
3731 | tool_name,
3732 | COUNT(*) as usage_count,
3733 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration,
3734 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as success_count,
3735 | MAX(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as max_duration
3736 | FROM actions
3737 | WHERE started_at >= ?
3738 | GROUP BY tool_name
3739 | ORDER BY usage_count DESC
3740 | """, (since_timestamp,))
3741 |
3742 | tool_utilization = [
3743 | ToolUtilization(**dict(zip([d[0] for d in cursor.description], row, strict=False)))
3744 | for row in cursor.fetchall()
3745 | ]
3746 |
3747 | # Top bottlenecks (slowest operations)
3748 | cursor.execute("""
3749 | SELECT
3750 | tool_name,
3751 | workflow_id,
3752 | action_id,
3753 | started_at,
3754 | completed_at,
3755 | (completed_at - started_at) as duration,
3756 | status,
3757 | reasoning
3758 | FROM actions
3759 | WHERE started_at >= ? AND completed_at IS NOT NULL
3760 | ORDER BY duration DESC
3761 | LIMIT 10
3762 | """, (since_timestamp,))
3763 |
3764 | bottlenecks = [
3765 | Bottleneck(**dict(zip([d[0] for d in cursor.description], row, strict=False)))
3766 | for row in cursor.fetchall()
3767 | ]
3768 |
3769 | conn.close()
3770 |
3771 | return PerformanceOverviewResponse(
3772 | overview=overview_stats,
3773 | timeline=timeline_data,
3774 | tool_utilization=tool_utilization,
3775 | bottlenecks=bottlenecks,
3776 | analysis_period={
3777 | 'hours_back': hours_back,
3778 | 'granularity': granularity,
3779 | 'start_time': since_timestamp,
3780 | 'end_time': datetime.now().timestamp()
3781 | },
3782 | timestamp=datetime.now().isoformat()
3783 | )
3784 |
3785 | except sqlite3.Error as e:
3786 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
3787 | except Exception as e:
3788 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
3789 |
3790 | @app.get(
3791 | "/performance/bottlenecks",
3792 | response_model=BottleneckAnalysisResponse,
3793 | tags=["Performance Profiler"],
3794 | summary="Identify and analyze performance bottlenecks with detailed insights",
3795 | description="""
3796 | Perform comprehensive bottleneck analysis including:
3797 |
3798 | - **Tool performance analysis** with percentile breakdowns (P95, P99)
3799 | - **Workflow efficiency scoring** and parallelization opportunities
3800 | - **Resource contention detection** and conflict analysis
3801 | - **Optimization recommendations** with impact estimates
3802 | - **Critical path identification** for workflow optimization
3803 |
3804 | Advanced algorithms identify bottlenecks using statistical analysis and provide actionable insights.
3805 | """,
3806 | responses={
3807 | 200: {
3808 | "description": "Comprehensive bottleneck analysis with optimization opportunities"
3809 | }
3810 | }
3811 | )
3812 | async def get_performance_bottlenecks(
3813 | hours_back: int = Query(
3814 | 24,
3815 | description="Hours back to analyze for bottlenecks",
3816 | ge=1,
3817 | le=720
3818 | ),
3819 | min_duration: float = Query(
3820 | 1.0,
3821 | description="Minimum execution duration (seconds) to consider as potential bottleneck",
3822 | ge=0.1
3823 | )
3824 | ) -> BottleneckAnalysisResponse:
3825 | """Identify and analyze performance bottlenecks with detailed insights"""
3826 | try:
3827 | conn = get_db_connection()
3828 | cursor = conn.cursor()
3829 |
3830 | since_timestamp = datetime.now().timestamp() - (hours_back * 3600)
3831 |
3832 | # Identify bottlenecks by tool with percentile calculations
3833 | # Note: SQLite doesn't have PERCENTILE_CONT, so we'll approximate
3834 | cursor.execute("""
3835 | WITH tool_durations AS (
3836 | SELECT
3837 | tool_name,
3838 | (completed_at - started_at) as duration
3839 | FROM actions
3840 | WHERE started_at >= ?
3841 | AND completed_at IS NOT NULL
3842 | AND (completed_at - started_at) >= ?
3843 | )
3844 | SELECT
3845 | tool_name,
3846 | COUNT(*) as total_calls,
3847 | AVG(duration) as avg_duration,
3848 | MAX(duration) as max_duration,
3849 | MIN(duration) as min_duration,
3850 | SUM(CASE WHEN a.status = 'failed' THEN 1 ELSE 0 END) as failure_count,
3851 | SUM(duration) as total_time_spent
3852 | FROM tool_durations td
3853 | JOIN actions a USING(tool_name)
3854 | WHERE a.started_at >= ? AND a.completed_at IS NOT NULL
3855 | GROUP BY tool_name
3856 | ORDER BY avg_duration DESC
3857 | """, (since_timestamp, min_duration, since_timestamp))
3858 |
3859 | tool_bottlenecks = []
3860 | for row in cursor.fetchall():
3861 | data = dict(zip([d[0] for d in cursor.description], row, strict=False))
3862 | # Approximate percentiles (in production, you'd calculate these properly)
3863 | data['p95_duration'] = data['avg_duration'] * 1.5 # Approximation
3864 | data['p99_duration'] = data['avg_duration'] * 2.0 # Approximation
3865 | tool_bottlenecks.append(ToolBottleneck(**data))
3866 |
3867 | # Identify workflow bottlenecks
3868 | cursor.execute("""
3869 | SELECT
3870 | w.workflow_id,
3871 | w.title,
3872 | COUNT(a.action_id) as action_count,
3873 | AVG(a.completed_at - a.started_at) as avg_action_duration,
3874 | MAX(a.completed_at - a.started_at) as max_action_duration,
3875 | SUM(a.completed_at - a.started_at) as total_workflow_time,
3876 | MIN(a.started_at) as workflow_start,
3877 | MAX(a.completed_at) as workflow_end,
3878 | (MAX(a.completed_at) - MIN(a.started_at)) as total_elapsed_time
3879 | FROM workflows w
3880 | JOIN actions a ON w.workflow_id = a.workflow_id
3881 | WHERE a.started_at >= ? AND a.completed_at IS NOT NULL
3882 | GROUP BY w.workflow_id, w.title
3883 | HAVING COUNT(a.action_id) > 1
3884 | ORDER BY total_workflow_time DESC
3885 | LIMIT 20
3886 | """, (since_timestamp,))
3887 |
3888 | workflow_bottlenecks = [
3889 | WorkflowBottleneck(**dict(zip([d[0] for d in cursor.description], row, strict=False)))
3890 | for row in cursor.fetchall()
3891 | ]
3892 |
3893 | # Calculate parallelization opportunities
3894 | cursor.execute("""
3895 | SELECT
3896 | workflow_id,
3897 | COUNT(*) as sequential_actions,
3898 | SUM(completed_at - started_at) as total_sequential_time,
3899 | (MAX(completed_at) - MIN(started_at)) as actual_elapsed_time
3900 | FROM actions
3901 | WHERE started_at >= ? AND completed_at IS NOT NULL
3902 | GROUP BY workflow_id
3903 | HAVING COUNT(*) > 2
3904 | """, (since_timestamp,))
3905 |
3906 | parallelization_opportunities = []
3907 | for row in cursor.fetchall():
3908 | data = dict(zip([d[0] for d in cursor.description], row, strict=False))
3909 | potential_savings = data['total_sequential_time'] - data['actual_elapsed_time']
3910 | if potential_savings > 0:
3911 | parallelization_opportunities.append(ParallelizationOpportunity(
3912 | **data,
3913 | potential_time_savings=potential_savings,
3914 | parallelization_efficiency=(data['actual_elapsed_time'] / data['total_sequential_time']) * 100,
3915 | optimization_score=min(10, potential_savings / data['actual_elapsed_time'] * 10)
3916 | ))
3917 |
3918 | # Resource contention analysis
3919 | cursor.execute("""
3920 | SELECT
3921 | tool_name,
3922 | COUNT(*) as concurrent_usage,
3923 | AVG(completed_at - started_at) as avg_duration_under_contention
3924 | FROM actions a1
3925 | WHERE started_at >= ? AND EXISTS (
3926 | SELECT 1 FROM actions a2
3927 | WHERE a2.tool_name = a1.tool_name
3928 | AND a2.action_id != a1.action_id
3929 | AND a2.started_at <= a1.completed_at
3930 | AND a2.completed_at >= a1.started_at
3931 | )
3932 | GROUP BY tool_name
3933 | ORDER BY concurrent_usage DESC
3934 | """, (since_timestamp,))
3935 |
3936 | resource_contention = [
3937 | ResourceContention(**dict(zip([d[0] for d in cursor.description], row, strict=False)))
3938 | for row in cursor.fetchall()
3939 | ]
3940 |
3941 | conn.close()
3942 |
3943 | # Generate optimization recommendations
3944 | recommendations = []
3945 |
3946 | # Tool-based recommendations
3947 | for tool in tool_bottlenecks[:5]:
3948 | if tool.avg_duration > 10: # More than 10 seconds average
3949 | recommendations.append(OptimizationRecommendation(
3950 | type='tool_optimization',
3951 | priority='high' if tool.avg_duration > 30 else 'medium',
3952 | title=f"Optimize {tool.tool_name} performance",
3953 | description=f"Tool {tool.tool_name} has high average duration of {tool.avg_duration:.2f}s",
3954 | impact=f"Could save ~{tool.total_time_spent * 0.3:.2f}s per execution period",
3955 | actions=[
3956 | 'Review tool implementation for optimization opportunities',
3957 | 'Consider caching strategies for repeated operations',
3958 | 'Evaluate if tool can be replaced with faster alternative'
3959 | ]
3960 | ))
3961 |
3962 | # Parallelization recommendations
3963 | for opp in sorted(parallelization_opportunities, key=lambda x: x.potential_time_savings, reverse=True)[:3]:
3964 | recommendations.append(OptimizationRecommendation(
3965 | type='parallelization',
3966 | priority='high' if opp.potential_time_savings > 20 else 'medium',
3967 | title=f"Parallelize workflow {opp.workflow_id}",
3968 | description=f"Workflow could save {opp.potential_time_savings:.2f}s through parallel execution",
3969 | impact=f"Up to {opp.parallelization_efficiency:.1f}% efficiency improvement",
3970 | actions=[
3971 | 'Analyze action dependencies to identify parallelizable segments',
3972 | 'Implement async execution where possible',
3973 | 'Consider workflow restructuring for better parallelization'
3974 | ]
3975 | ))
3976 |
3977 | return BottleneckAnalysisResponse(
3978 | tool_bottlenecks=tool_bottlenecks,
3979 | workflow_bottlenecks=workflow_bottlenecks,
3980 | parallelization_opportunities=parallelization_opportunities,
3981 | resource_contention=resource_contention,
3982 | recommendations=recommendations,
3983 | analysis_summary={
3984 | 'total_bottlenecks_identified': len(tool_bottlenecks) + len(workflow_bottlenecks),
3985 | 'highest_impact_tool': tool_bottlenecks[0].tool_name if tool_bottlenecks else None,
3986 | 'avg_tool_duration': sum(t.avg_duration for t in tool_bottlenecks) / len(tool_bottlenecks) if tool_bottlenecks else 0,
3987 | 'parallelization_potential': len(parallelization_opportunities)
3988 | },
3989 | timestamp=datetime.now().isoformat()
3990 | )
3991 |
3992 | except sqlite3.Error as e:
3993 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
3994 | except Exception as e:
3995 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
3996 |
3997 | # ---------- Flame Graph Helper Functions ----------
3998 |
3999 | def build_flame_graph_structure(actions: List[Dict], workflow_id: str) -> Dict:
4000 | """Build hierarchical flame graph structure from actions"""
4001 | total_duration = sum(action.get('duration', 0) for action in actions if action.get('duration'))
4002 |
4003 | flame_graph_data = {
4004 | 'name': f'Workflow {workflow_id}',
4005 | 'value': total_duration,
4006 | 'children': []
4007 | }
4008 |
4009 | # Group actions by tool for flame graph hierarchy
4010 | tool_groups = {}
4011 | for action in actions:
4012 | tool_name = action.get('tool_name', 'unknown')
4013 | if tool_name not in tool_groups:
4014 | tool_groups[tool_name] = []
4015 | tool_groups[tool_name].append(action)
4016 |
4017 | # Build hierarchical structure
4018 | for tool_name, tool_actions in tool_groups.items():
4019 | tool_duration = sum(action.get('duration', 0) for action in tool_actions if action.get('duration'))
4020 |
4021 | tool_node = {
4022 | 'name': tool_name,
4023 | 'value': tool_duration,
4024 | 'children': []
4025 | }
4026 |
4027 | # Add individual actions as children
4028 | for action in tool_actions:
4029 | if action.get('duration'):
4030 | action_node = {
4031 | 'name': f"Action {action['action_id']}",
4032 | 'value': action['duration'],
4033 | 'action_id': action['action_id'],
4034 | 'status': action.get('status'),
4035 | 'reasoning': action.get('reasoning', ''),
4036 | 'started_at': action.get('started_at'),
4037 | 'completed_at': action.get('completed_at')
4038 | }
4039 | tool_node['children'].append(action_node)
4040 |
4041 | flame_graph_data['children'].append(tool_node)
4042 |
4043 | return flame_graph_data
4044 |
4045 | def calculate_critical_path(actions: List[Dict]) -> List[Dict]:
4046 | """Calculate the critical path through the workflow"""
4047 | if not actions:
4048 | return []
4049 |
4050 | # Sort actions by start time
4051 | sorted_actions = sorted(actions, key=lambda x: x.get('started_at', 0))
4052 |
4053 | critical_path = []
4054 | current_time = min(action['started_at'] for action in sorted_actions if action.get('started_at'))
4055 | workflow_end = max(action['completed_at'] for action in sorted_actions if action.get('completed_at'))
4056 |
4057 | while current_time < workflow_end:
4058 | # Find action that was running at current_time and ends latest
4059 | running_actions = [
4060 | a for a in sorted_actions
4061 | if a.get('started_at', 0) <= current_time and a.get('completed_at', 0) > current_time
4062 | ]
4063 |
4064 | if running_actions:
4065 | # Find the action that ends latest (most critical)
4066 | critical_action = max(running_actions, key=lambda x: x.get('completed_at', 0))
4067 | if critical_action not in [cp['action_id'] for cp in critical_path]:
4068 | critical_path.append({
4069 | 'action_id': critical_action['action_id'],
4070 | 'tool_name': critical_action.get('tool_name'),
4071 | 'duration': critical_action.get('duration', 0),
4072 | 'start_time': critical_action.get('started_at'),
4073 | 'end_time': critical_action.get('completed_at')
4074 | })
4075 | current_time = critical_action.get('completed_at', current_time + 1)
4076 | else:
4077 | # No action running, find next action start
4078 | future_actions = [a for a in sorted_actions if a.get('started_at', 0) > current_time]
4079 | if future_actions:
4080 | current_time = min(a['started_at'] for a in future_actions)
4081 | else:
4082 | break
4083 |
4084 | return critical_path
4085 |
4086 | # ---------- Flame Graph Pydantic Models ----------
4087 |
4088 | class FlameGraphNode(BaseModel):
4089 | """Model for a flame graph node"""
4090 | name: str = Field(..., description="Name of the node (workflow, tool, or action)")
4091 | value: float = Field(..., description="Duration in seconds")
4092 | children: List['FlameGraphNode'] = Field(default_factory=list, description="Child nodes")
4093 | action_id: Optional[str] = Field(None, description="Action ID if this is an action node")
4094 | status: Optional[str] = Field(None, description="Execution status")
4095 | reasoning: Optional[str] = Field(None, description="Reasoning for the action")
4096 | started_at: Optional[float] = Field(None, description="Start timestamp")
4097 | completed_at: Optional[float] = Field(None, description="Completion timestamp")
4098 |
4099 | FlameGraphNode.model_rebuild() # Needed for recursive model
4100 |
4101 | class CriticalPathAction(BaseModel):
4102 | """Model for a critical path action"""
4103 | action_id: str = Field(..., description="Action identifier")
4104 | tool_name: str = Field(..., description="Tool used for the action")
4105 | duration: float = Field(..., description="Duration in seconds")
4106 | start_time: float = Field(..., description="Start timestamp")
4107 | end_time: float = Field(..., description="End timestamp")
4108 |
4109 | class WorkflowMetrics(BaseModel):
4110 | """Workflow performance metrics"""
4111 | total_actions: int = Field(..., description="Total number of actions in workflow")
4112 | total_cpu_time: float = Field(..., description="Total CPU time (sum of all action durations)")
4113 | wall_clock_time: float = Field(..., description="Total wall clock time from start to end")
4114 | parallelization_efficiency: float = Field(..., description="Efficiency percentage (0-100)")
4115 | avg_action_duration: float = Field(..., description="Average duration per action")
4116 | workflow_start: float = Field(..., description="Workflow start timestamp")
4117 | workflow_end: float = Field(..., description="Workflow end timestamp")
4118 |
4119 | class WorkflowAnalysis(BaseModel):
4120 | """Analysis results for workflow optimization"""
4121 | bottleneck_tool: Optional[str] = Field(None, description="Tool causing the main bottleneck")
4122 | parallelization_potential: float = Field(..., description="Potential time savings through parallelization")
4123 | optimization_score: float = Field(..., description="Overall optimization score (0-10)")
4124 |
4125 | class FlameGraphResponse(BaseModel):
4126 | """Response model for flame graph generation"""
4127 | flame_graph: FlameGraphNode = Field(..., description="Hierarchical flame graph data")
4128 | metrics: WorkflowMetrics = Field(..., description="Workflow performance metrics")
4129 | critical_path: List[CriticalPathAction] = Field(..., description="Critical path through the workflow")
4130 | analysis: WorkflowAnalysis = Field(..., description="Workflow optimization analysis")
4131 | timestamp: str = Field(..., description="Response generation timestamp")
4132 |
4133 | # ---------- Performance Trends Pydantic Models ----------
4134 |
4135 | class DailyTrend(BaseModel):
4136 | """Model for daily performance metrics"""
4137 | date: str = Field(..., description="Date in YYYY-MM-DD format")
4138 | action_count: int = Field(..., description="Number of actions executed")
4139 | avg_duration: Optional[float] = Field(None, description="Average action duration in seconds")
4140 | success_rate: float = Field(..., description="Success rate percentage (0-100)")
4141 | throughput: float = Field(..., description="Actions per hour")
4142 | error_rate: float = Field(..., description="Error rate percentage (0-100)")
4143 | successful_actions: int = Field(..., description="Number of successful actions")
4144 | failed_actions: int = Field(..., description="Number of failed actions")
4145 | workflow_count: int = Field(..., description="Number of unique workflows")
4146 | tool_count: int = Field(..., description="Number of unique tools used")
4147 |
4148 | class ToolTrend(BaseModel):
4149 | """Model for tool-specific performance trends"""
4150 | tool_name: str = Field(..., description="Name of the tool")
4151 | date: str = Field(..., description="Date in YYYY-MM-DD format")
4152 | usage_count: int = Field(..., description="Number of times used")
4153 | avg_duration: Optional[float] = Field(None, description="Average execution duration")
4154 | success_count: int = Field(..., description="Number of successful executions")
4155 |
4156 | class WorkflowComplexityTrend(BaseModel):
4157 | """Model for workflow complexity trends"""
4158 | date: str = Field(..., description="Date in YYYY-MM-DD format")
4159 | workflow_id: str = Field(..., description="Workflow identifier")
4160 | action_count: int = Field(..., description="Number of actions in workflow")
4161 | total_duration: Optional[float] = Field(None, description="Total workflow duration")
4162 | elapsed_time: Optional[float] = Field(None, description="Wall clock time")
4163 |
4164 | class Pattern(BaseModel):
4165 | """Detected performance pattern"""
4166 | type: str = Field(..., description="Type of pattern detected")
4167 | description: str = Field(..., description="Description of the pattern")
4168 | impact: str = Field(..., description="Impact level (high/medium/low)")
4169 | recommendation: str = Field(..., description="Recommended action")
4170 | date: Optional[str] = Field(None, description="Date of occurrence for anomalies")
4171 |
4172 | class TrendAnalysis(BaseModel):
4173 | """Trend analysis results"""
4174 | performance_trend: str = Field(..., description="Overall performance trend (improving/degrading/stable/insufficient_data)")
4175 | success_trend: str = Field(..., description="Success rate trend (improving/degrading/stable/insufficient_data)")
4176 | data_points: int = Field(..., description="Number of data points analyzed")
4177 | analysis_period_days: int = Field(..., description="Analysis period in days")
4178 |
4179 | class InsightMetrics(BaseModel):
4180 | """Performance insight metrics"""
4181 | best_performing_day: Optional[DailyTrend] = Field(None, description="Day with best performance")
4182 | worst_performing_day: Optional[DailyTrend] = Field(None, description="Day with worst performance")
4183 | peak_throughput_day: Optional[DailyTrend] = Field(None, description="Day with highest throughput")
4184 | avg_daily_actions: float = Field(..., description="Average actions per day")
4185 |
4186 | class PerformanceTrendsResponse(BaseModel):
4187 | """Response model for performance trends analysis"""
4188 | daily_trends: List[DailyTrend] = Field(..., description="Daily performance metrics")
4189 | tool_trends: List[ToolTrend] = Field(..., description="Tool-specific performance trends")
4190 | workflow_complexity: List[WorkflowComplexityTrend] = Field(..., description="Workflow complexity trends")
4191 | trend_analysis: TrendAnalysis = Field(..., description="Overall trend analysis")
4192 | patterns: List[Pattern] = Field(..., description="Detected performance patterns")
4193 | insights: InsightMetrics = Field(..., description="Key performance insights")
4194 | timestamp: str = Field(..., description="Response generation timestamp")
4195 |
4196 | # ---------- Advanced Performance Profiler Endpoints ----------
4197 |
4198 | @app.get(
4199 | "/performance/flame-graph",
4200 | response_model=FlameGraphResponse,
4201 | tags=["Performance Profiler"],
4202 | summary="Generate flame graph data for workflow performance visualization",
4203 | description="""
4204 | Generate hierarchical flame graph data for detailed workflow performance analysis:
4205 |
4206 | - **Interactive flame graph structure** showing execution hierarchy
4207 | - **Critical path analysis** highlighting the longest dependency chain
4208 | - **Tool-level performance breakdown** with execution times
4209 | - **Parallelization efficiency metrics** and optimization scores
4210 | - **Execution timeline analysis** with CPU vs wall-clock time
4211 |
4212 | Industry-standard flame graph visualization for profiling workflow execution patterns.
4213 | """,
4214 | responses={
4215 | 200: {
4216 | "description": "Flame graph data with performance metrics and critical path",
4217 | "content": {
4218 | "application/json": {
4219 | "example": {
4220 | "flame_graph": {
4221 | "name": "Workflow workflow_abc123",
4222 | "value": 145.5,
4223 | "children": [
4224 | {
4225 | "name": "smart_browser",
4226 | "value": 85.3,
4227 | "children": [
4228 | {
4229 | "name": "Action act_001",
4230 | "value": 45.2,
4231 | "action_id": "act_001",
4232 | "status": "completed",
4233 | "reasoning": "Navigate to documentation site",
4234 | "started_at": 1703980800.0,
4235 | "completed_at": 1703980845.2
4236 | }
4237 | ]
4238 | },
4239 | {
4240 | "name": "execute_python",
4241 | "value": 60.2,
4242 | "children": []
4243 | }
4244 | ]
4245 | },
4246 | "metrics": {
4247 | "total_actions": 5,
4248 | "total_cpu_time": 145.5,
4249 | "wall_clock_time": 98.7,
4250 | "parallelization_efficiency": 67.8,
4251 | "avg_action_duration": 29.1,
4252 | "workflow_start": 1703980800.0,
4253 | "workflow_end": 1703980898.7
4254 | },
4255 | "critical_path": [
4256 | {
4257 | "action_id": "act_001",
4258 | "tool_name": "smart_browser",
4259 | "duration": 45.2,
4260 | "start_time": 1703980800.0,
4261 | "end_time": 1703980845.2
4262 | }
4263 | ],
4264 | "analysis": {
4265 | "bottleneck_tool": "smart_browser",
4266 | "parallelization_potential": 46.8,
4267 | "optimization_score": 6.8
4268 | },
4269 | "timestamp": "2024-01-01T00:00:00Z"
4270 | }
4271 | }
4272 | }
4273 | },
4274 | 400: {
4275 | "description": "Missing required workflow_id parameter",
4276 | "content": {
4277 | "application/json": {
4278 | "example": {"detail": "workflow_id parameter is required"}
4279 | }
4280 | }
4281 | },
4282 | 404: {
4283 | "description": "No actions found for specified workflow",
4284 | "content": {
4285 | "application/json": {
4286 | "example": {"detail": "No actions found for workflow 'workflow_abc123'"}
4287 | }
4288 | }
4289 | },
4290 | 500: {
4291 | "description": "Internal server error"
4292 | }
4293 | }
4294 | )
4295 | async def get_performance_flame_graph(
4296 | workflow_id: str = Query(
4297 | ...,
4298 | description="Workflow ID to generate flame graph for",
4299 | example="workflow_abc123",
4300 | regex="^[a-zA-Z0-9_-]+$"
4301 | ),
4302 | hours_back: int = Query(
4303 | 24,
4304 | description="Hours back to search for workflow execution data",
4305 | ge=1,
4306 | le=720, # Max 30 days
4307 | example=24
4308 | )
4309 | ) -> FlameGraphResponse:
4310 | """Generate flame graph data for workflow performance visualization"""
4311 | try:
4312 | conn = get_db_connection()
4313 | cursor = conn.cursor()
4314 |
4315 | since_timestamp = datetime.now().timestamp() - (hours_back * 3600)
4316 |
4317 | # Get workflow actions with timing data
4318 | cursor.execute("""
4319 | SELECT
4320 | action_id,
4321 | tool_name,
4322 | started_at,
4323 | completed_at,
4324 | (completed_at - started_at) as duration,
4325 | status,
4326 | reasoning,
4327 | summary,
4328 | dependency_path
4329 | FROM actions
4330 | WHERE workflow_id = ? AND started_at >= ?
4331 | ORDER BY started_at
4332 | """, (workflow_id, since_timestamp))
4333 |
4334 | actions = [dict(zip([d[0] for d in cursor.description], row, strict=False)) for row in cursor.fetchall()]
4335 |
4336 | if not actions:
4337 | raise HTTPException(
4338 | status_code=404,
4339 | detail=f"No actions found for workflow '{workflow_id}'"
4340 | )
4341 |
4342 | # Build flame graph structure
4343 | flame_graph_data = build_flame_graph_structure(actions, workflow_id)
4344 |
4345 | # Calculate performance metrics
4346 | total_duration = sum(action.get('duration', 0) for action in actions if action.get('duration'))
4347 | workflow_start = min(action['started_at'] for action in actions if action.get('started_at'))
4348 | workflow_end = max(action['completed_at'] for action in actions if action.get('completed_at'))
4349 | wall_clock_time = workflow_end - workflow_start if workflow_end and workflow_start else 0
4350 |
4351 | # Parallelization efficiency
4352 | parallelization_efficiency = (wall_clock_time / total_duration * 100) if total_duration > 0 else 0
4353 |
4354 | # Critical path analysis
4355 | critical_path = calculate_critical_path(actions)
4356 |
4357 | # Find bottleneck tool
4358 | tool_durations = {}
4359 | for action in actions:
4360 | tool_name = action.get('tool_name', 'unknown')
4361 | duration = action.get('duration', 0)
4362 | if tool_name not in tool_durations:
4363 | tool_durations[tool_name] = 0
4364 | tool_durations[tool_name] += duration
4365 |
4366 | bottleneck_tool = max(tool_durations.keys(), key=lambda t: tool_durations[t]) if tool_durations else None
4367 |
4368 | # Calculate optimization potential
4369 | parallelization_potential = max(0, total_duration - wall_clock_time)
4370 | optimization_score = min(10, parallelization_efficiency / 10)
4371 |
4372 | conn.close()
4373 |
4374 | # Convert flame graph to Pydantic model
4375 | def convert_to_model(node: Dict) -> FlameGraphNode:
4376 | return FlameGraphNode(
4377 | name=node['name'],
4378 | value=node['value'],
4379 | children=[convert_to_model(child) for child in node.get('children', [])],
4380 | action_id=node.get('action_id'),
4381 | status=node.get('status'),
4382 | reasoning=node.get('reasoning'),
4383 | started_at=node.get('started_at'),
4384 | completed_at=node.get('completed_at')
4385 | )
4386 |
4387 | return FlameGraphResponse(
4388 | flame_graph=convert_to_model(flame_graph_data),
4389 | metrics=WorkflowMetrics(
4390 | total_actions=len(actions),
4391 | total_cpu_time=total_duration,
4392 | wall_clock_time=wall_clock_time,
4393 | parallelization_efficiency=parallelization_efficiency,
4394 | avg_action_duration=total_duration / len(actions) if actions else 0,
4395 | workflow_start=workflow_start,
4396 | workflow_end=workflow_end
4397 | ),
4398 | critical_path=[CriticalPathAction(**cp) for cp in critical_path],
4399 | analysis=WorkflowAnalysis(
4400 | bottleneck_tool=bottleneck_tool,
4401 | parallelization_potential=parallelization_potential,
4402 | optimization_score=optimization_score
4403 | ),
4404 | timestamp=datetime.now().isoformat()
4405 | )
4406 |
4407 | except HTTPException:
4408 | raise
4409 | except sqlite3.Error as e:
4410 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
4411 | except Exception as e:
4412 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
4413 |
4414 | @app.get(
4415 | "/performance/trends",
4416 | response_model=PerformanceTrendsResponse,
4417 | tags=["Performance Profiler"],
4418 | summary="Analyze performance trends and patterns over time",
4419 | description="""
4420 | Comprehensive trend analysis for long-term performance monitoring:
4421 |
4422 | - **Daily performance trends** with configurable time periods
4423 | - **Pattern detection algorithms** identifying weekly patterns and anomalies
4424 | - **Trend classification** (improving, degrading, stable) with confidence scores
4425 | - **Performance insights** with contextual explanations
4426 | - **Comparative analysis** showing best/worst performing periods
4427 |
4428 | Advanced analytics help identify performance degradation and optimization opportunities over time.
4429 | """,
4430 | responses={
4431 | 200: {
4432 | "description": "Performance trends with pattern analysis and insights",
4433 | "content": {
4434 | "application/json": {
4435 | "example": {
4436 | "daily_trends": [
4437 | {
4438 | "date": "2024-01-01",
4439 | "action_count": 150,
4440 | "avg_duration": 25.5,
4441 | "success_rate": 92.5,
4442 | "throughput": 6.25,
4443 | "error_rate": 7.5,
4444 | "successful_actions": 139,
4445 | "failed_actions": 11,
4446 | "workflow_count": 15,
4447 | "tool_count": 8
4448 | }
4449 | ],
4450 | "tool_trends": [
4451 | {
4452 | "tool_name": "smart_browser",
4453 | "date": "2024-01-01",
4454 | "usage_count": 45,
4455 | "avg_duration": 35.2,
4456 | "success_count": 42
4457 | }
4458 | ],
4459 | "workflow_complexity": [],
4460 | "trend_analysis": {
4461 | "performance_trend": "improving",
4462 | "success_trend": "stable",
4463 | "data_points": 7,
4464 | "analysis_period_days": 7
4465 | },
4466 | "patterns": [
4467 | {
4468 | "type": "weekly_pattern",
4469 | "description": "Performance varies significantly between weekdays (25.5s) and weekends (35.2s)",
4470 | "impact": "medium",
4471 | "recommendation": "Consider different optimization strategies for weekend vs weekday operations"
4472 | }
4473 | ],
4474 | "insights": {
4475 | "best_performing_day": {
4476 | "date": "2024-01-03",
4477 | "action_count": 180,
4478 | "avg_duration": 22.3,
4479 | "success_rate": 95.5
4480 | },
4481 | "avg_daily_actions": 150.5
4482 | },
4483 | "timestamp": "2024-01-07T12:00:00Z"
4484 | }
4485 | }
4486 | }
4487 | },
4488 | 500: {
4489 | "description": "Internal server error"
4490 | }
4491 | }
4492 | )
4493 | async def get_performance_trends(
4494 | days_back: int = Query(
4495 | 7,
4496 | description="Number of days back to analyze trends",
4497 | ge=1,
4498 | le=90, # Max 3 months
4499 | example=7
4500 | ),
4501 | metric: str = Query(
4502 | "duration",
4503 | description="Primary metric to analyze for trends",
4504 | regex="^(duration|success_rate|throughput)$",
4505 | example="duration"
4506 | )
4507 | ) -> PerformanceTrendsResponse:
4508 | """Analyze performance trends and patterns over time"""
4509 | try:
4510 | conn = get_db_connection()
4511 | cursor = conn.cursor()
4512 |
4513 | since_timestamp = datetime.now().timestamp() - (days_back * 24 * 3600)
4514 |
4515 | # Daily trends
4516 | cursor.execute("""
4517 | SELECT
4518 | DATE(datetime(started_at, 'unixepoch')) as date,
4519 | COUNT(*) as action_count,
4520 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration,
4521 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful_actions,
4522 | SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_actions,
4523 | COUNT(DISTINCT workflow_id) as workflow_count,
4524 | COUNT(DISTINCT tool_name) as tool_count
4525 | FROM actions
4526 | WHERE started_at >= ?
4527 | GROUP BY DATE(datetime(started_at, 'unixepoch'))
4528 | ORDER BY date
4529 | """, (since_timestamp,))
4530 |
4531 | daily_trends = []
4532 | for row in cursor.fetchall():
4533 | date, action_count, avg_duration, successful_actions, failed_actions, workflow_count, tool_count = row
4534 |
4535 | success_rate = (successful_actions / max(1, action_count)) * 100
4536 | throughput = action_count / 24 # actions per hour
4537 | error_rate = (failed_actions / max(1, action_count)) * 100
4538 |
4539 | daily_trends.append(DailyTrend(
4540 | date=date,
4541 | action_count=action_count,
4542 | avg_duration=avg_duration,
4543 | success_rate=success_rate,
4544 | throughput=throughput,
4545 | error_rate=error_rate,
4546 | successful_actions=successful_actions,
4547 | failed_actions=failed_actions,
4548 | workflow_count=workflow_count,
4549 | tool_count=tool_count
4550 | ))
4551 |
4552 | # Tool performance trends
4553 | cursor.execute("""
4554 | SELECT
4555 | tool_name,
4556 | DATE(datetime(started_at, 'unixepoch')) as date,
4557 | COUNT(*) as usage_count,
4558 | AVG(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as avg_duration,
4559 | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as success_count
4560 | FROM actions
4561 | WHERE started_at >= ?
4562 | GROUP BY tool_name, DATE(datetime(started_at, 'unixepoch'))
4563 | ORDER BY tool_name, date
4564 | """, (since_timestamp,))
4565 |
4566 | tool_trends = [
4567 | ToolTrend(
4568 | tool_name=row[0],
4569 | date=row[1],
4570 | usage_count=row[2],
4571 | avg_duration=row[3],
4572 | success_count=row[4]
4573 | )
4574 | for row in cursor.fetchall()
4575 | ]
4576 |
4577 | # Workflow complexity trends
4578 | cursor.execute("""
4579 | SELECT
4580 | DATE(datetime(started_at, 'unixepoch')) as date,
4581 | workflow_id,
4582 | COUNT(*) as action_count,
4583 | SUM(CASE WHEN completed_at IS NOT NULL THEN completed_at - started_at ELSE NULL END) as total_duration,
4584 | (MAX(completed_at) - MIN(started_at)) as elapsed_time
4585 | FROM actions
4586 | WHERE started_at >= ? AND workflow_id IS NOT NULL
4587 | GROUP BY DATE(datetime(started_at, 'unixepoch')), workflow_id
4588 | ORDER BY date, workflow_id
4589 | """, (since_timestamp,))
4590 |
4591 | workflow_complexity = [
4592 | WorkflowComplexityTrend(
4593 | date=row[0],
4594 | workflow_id=row[1],
4595 | action_count=row[2],
4596 | total_duration=row[3],
4597 | elapsed_time=row[4]
4598 | )
4599 | for row in cursor.fetchall()
4600 | ]
4601 |
4602 | # Calculate trend analysis
4603 | if len(daily_trends) >= 2:
4604 | # Performance trend (improving, degrading, stable)
4605 | recent_avg = sum(d.avg_duration or 0 for d in daily_trends[-3:]) / min(3, len(daily_trends))
4606 | earlier_avg = sum(d.avg_duration or 0 for d in daily_trends[:3]) / min(3, len(daily_trends))
4607 |
4608 | if recent_avg > earlier_avg * 1.1:
4609 | performance_trend = 'degrading'
4610 | elif recent_avg < earlier_avg * 0.9:
4611 | performance_trend = 'improving'
4612 | else:
4613 | performance_trend = 'stable'
4614 |
4615 | # Success rate trend
4616 | recent_success = sum(d.success_rate for d in daily_trends[-3:]) / min(3, len(daily_trends))
4617 | earlier_success = sum(d.success_rate for d in daily_trends[:3]) / min(3, len(daily_trends))
4618 |
4619 | success_trend = 'improving' if recent_success > earlier_success else 'degrading' if recent_success < earlier_success else 'stable'
4620 | else:
4621 | performance_trend = 'insufficient_data'
4622 | success_trend = 'insufficient_data'
4623 |
4624 | # Identify performance patterns
4625 | patterns = []
4626 |
4627 | # Weekly pattern detection
4628 | if len(daily_trends) >= 7:
4629 | weekend_performance = [d for d in daily_trends if datetime.strptime(d.date, '%Y-%m-%d').weekday() >= 5]
4630 | weekday_performance = [d for d in daily_trends if datetime.strptime(d.date, '%Y-%m-%d').weekday() < 5]
4631 |
4632 | if weekend_performance and weekday_performance:
4633 | weekend_avg = sum(d.avg_duration or 0 for d in weekend_performance) / len(weekend_performance)
4634 | weekday_avg = sum(d.avg_duration or 0 for d in weekday_performance) / len(weekday_performance)
4635 |
4636 | if abs(weekend_avg - weekday_avg) > weekday_avg * 0.2:
4637 | patterns.append(Pattern(
4638 | type='weekly_pattern',
4639 | description=f"Performance varies significantly between weekdays ({weekday_avg:.2f}s) and weekends ({weekend_avg:.2f}s)",
4640 | impact='medium',
4641 | recommendation='Consider different optimization strategies for weekend vs weekday operations'
4642 | ))
4643 |
4644 | # Anomaly detection (simple outlier detection)
4645 | if daily_trends:
4646 | durations = [d.avg_duration or 0 for d in daily_trends]
4647 | mean_duration = sum(durations) / len(durations)
4648 |
4649 | outliers = [d for d in daily_trends if abs((d.avg_duration or 0) - mean_duration) > mean_duration * 0.5]
4650 |
4651 | for outlier in outliers:
4652 | patterns.append(Pattern(
4653 | type='performance_anomaly',
4654 | date=outlier.date,
4655 | description=f"Unusual performance on {outlier.date}: {outlier.avg_duration:.2f}s vs normal {mean_duration:.2f}s",
4656 | impact='high' if abs((outlier.avg_duration or 0) - mean_duration) > mean_duration else 'medium',
4657 | recommendation='Investigate system conditions and workload on this date'
4658 | ))
4659 |
4660 | # Generate insights
4661 | best_day = max(daily_trends, key=lambda x: x.success_rate) if daily_trends else None
4662 | worst_day = min(daily_trends, key=lambda x: x.success_rate) if daily_trends else None
4663 | peak_throughput_day = max(daily_trends, key=lambda x: x.throughput) if daily_trends else None
4664 | avg_daily_actions = sum(d.action_count for d in daily_trends) / len(daily_trends) if daily_trends else 0
4665 |
4666 | conn.close()
4667 |
4668 | return PerformanceTrendsResponse(
4669 | daily_trends=daily_trends,
4670 | tool_trends=tool_trends,
4671 | workflow_complexity=workflow_complexity,
4672 | trend_analysis=TrendAnalysis(
4673 | performance_trend=performance_trend,
4674 | success_trend=success_trend,
4675 | data_points=len(daily_trends),
4676 | analysis_period_days=days_back
4677 | ),
4678 | patterns=patterns,
4679 | insights=InsightMetrics(
4680 | best_performing_day=best_day,
4681 | worst_performing_day=worst_day,
4682 | peak_throughput_day=peak_throughput_day,
4683 | avg_daily_actions=avg_daily_actions
4684 | ),
4685 | timestamp=datetime.now().isoformat()
4686 | )
4687 |
4688 | except sqlite3.Error as e:
4689 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
4690 | except Exception as e:
4691 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
4692 |
4693 | # ---------- Performance Recommendations Helper Functions ----------
4694 |
4695 | def calculate_tool_reliability_score(tool_stats: dict) -> float:
4696 | """Calculate reliability score for a tool"""
4697 | total_calls = tool_stats.get('total_calls', 0)
4698 | successful_calls = tool_stats.get('successful_calls', 0)
4699 |
4700 | if total_calls == 0:
4701 | return 0.0
4702 |
4703 | success_rate = successful_calls / total_calls
4704 | volume_factor = min(1.0, total_calls / 100) # Normalize by 100 calls
4705 |
4706 | return round(success_rate * volume_factor * 100, 2)
4707 |
4708 | def categorize_tool_performance(avg_execution_time: float) -> str:
4709 | """Categorize tool performance based on average execution time"""
4710 | if avg_execution_time is None:
4711 | return 'unknown'
4712 |
4713 | if avg_execution_time <= 5:
4714 | return 'fast'
4715 | elif avg_execution_time <= 15:
4716 | return 'normal'
4717 | elif avg_execution_time <= 30:
4718 | return 'slow'
4719 | else:
4720 | return 'very_slow'
4721 |
4722 | # ---------- Performance Recommendations Pydantic Models ----------
4723 |
4724 | class ImpactEstimate(BaseModel):
4725 | """Model for recommendation impact estimates"""
4726 | time_savings_potential: float = Field(..., description="Estimated time savings in seconds")
4727 | affected_actions: int = Field(..., description="Number of actions that would benefit")
4728 | cost_benefit_ratio: float = Field(..., description="Ratio of benefit to implementation cost")
4729 | affected_workflows: Optional[int] = Field(None, description="Number of affected workflows")
4730 | efficiency_improvement: Optional[float] = Field(None, description="Percentage efficiency improvement")
4731 | reliability_improvement: Optional[float] = Field(None, description="Percentage reliability improvement")
4732 | user_experience_impact: Optional[str] = Field(None, description="Impact on user experience (high/medium/low)")
4733 |
4734 | class PerformanceRecommendation(BaseModel):
4735 | """Model for a single performance recommendation"""
4736 | id: str = Field(..., description="Unique identifier for the recommendation")
4737 | type: str = Field(..., description="Type of recommendation (tool_optimization, parallelization, reliability_improvement)")
4738 | priority: str = Field(..., description="Priority level (high, medium, low)")
4739 | title: str = Field(..., description="Brief title of the recommendation")
4740 | description: str = Field(..., description="Detailed description of the issue and recommendation")
4741 | impact_estimate: ImpactEstimate = Field(..., description="Estimated impact of implementing this recommendation")
4742 | implementation_steps: List[str] = Field(..., description="Step-by-step implementation guide")
4743 | estimated_effort: str = Field(..., description="Estimated implementation effort (low, medium, high)")
4744 | prerequisites: List[str] = Field(..., description="Prerequisites for implementation")
4745 | metrics_to_track: List[str] = Field(..., description="Metrics to track after implementation")
4746 |
4747 | class RecommendationSummary(BaseModel):
4748 | """Summary statistics for recommendations"""
4749 | total_recommendations: int = Field(..., description="Total number of recommendations generated")
4750 | high_priority: int = Field(..., description="Number of high priority recommendations")
4751 | medium_priority: int = Field(..., description="Number of medium priority recommendations")
4752 | low_priority: int = Field(..., description="Number of low priority recommendations")
4753 | estimated_total_savings: float = Field(..., description="Total estimated time savings in seconds")
4754 | analysis_period_hours: int = Field(..., description="Hours of data analyzed")
4755 |
4756 | class ImplementationRoadmap(BaseModel):
4757 | """Categorized implementation roadmap"""
4758 | quick_wins: List[PerformanceRecommendation] = Field(..., description="Low effort, high impact recommendations")
4759 | major_improvements: List[PerformanceRecommendation] = Field(..., description="High effort, high impact recommendations")
4760 | maintenance_tasks: List[PerformanceRecommendation] = Field(..., description="Low priority maintenance recommendations")
4761 |
4762 | class PerformanceRecommendationsResponse(BaseModel):
4763 | """Response model for performance recommendations"""
4764 | recommendations: List[PerformanceRecommendation] = Field(..., description="List of actionable recommendations")
4765 | summary: RecommendationSummary = Field(..., description="Summary statistics")
4766 | implementation_roadmap: ImplementationRoadmap = Field(..., description="Recommendations organized by implementation strategy")
4767 | timestamp: str = Field(..., description="ISO timestamp of analysis")
4768 |
4769 | # ---------- Performance Recommendations Endpoint ----------
4770 |
4771 | @app.get(
4772 | "/performance/recommendations",
4773 | response_model=PerformanceRecommendationsResponse,
4774 | tags=["Performance Profiler"],
4775 | summary="Generate actionable performance optimization recommendations",
4776 | description="""
4777 | AI-powered optimization recommendations engine providing:
4778 |
4779 | - **Prioritized recommendations** with impact and effort scoring
4780 | - **Implementation roadmaps** categorized by complexity and impact
4781 | - **Detailed implementation steps** with prerequisites and metrics
4782 | - **Cost-benefit analysis** with quantified impact estimates
4783 | - **Progress tracking guidance** with success metrics
4784 |
4785 | Smart recommendation system analyzes performance data to provide actionable optimization strategies.
4786 | """,
4787 | responses={
4788 | 200: {
4789 | "description": "Comprehensive optimization recommendations with implementation guidance",
4790 | "content": {
4791 | "application/json": {
4792 | "example": {
4793 | "recommendations": [
4794 | {
4795 | "id": "optimize_tool_smart_browser",
4796 | "type": "tool_optimization",
4797 | "priority": "high",
4798 | "title": "Optimize smart_browser performance",
4799 | "description": "Tool consumes 1543.2s total execution time with 25.3s average",
4800 | "impact_estimate": {
4801 | "time_savings_potential": 463.0,
4802 | "affected_actions": 61,
4803 | "cost_benefit_ratio": 8.5
4804 | },
4805 | "implementation_steps": [
4806 | "Profile smart_browser execution to identify bottlenecks",
4807 | "Consider caching frequently used data",
4808 | "Optimize database queries if applicable",
4809 | "Evaluate alternative implementations or libraries"
4810 | ],
4811 | "estimated_effort": "medium",
4812 | "prerequisites": ["Development environment setup", "Performance profiling tools"],
4813 | "metrics_to_track": [
4814 | "Average execution time",
4815 | "P95 execution time",
4816 | "Tool success rate",
4817 | "Resource utilization"
4818 | ]
4819 | }
4820 | ],
4821 | "summary": {
4822 | "total_recommendations": 5,
4823 | "high_priority": 2,
4824 | "medium_priority": 2,
4825 | "low_priority": 1,
4826 | "estimated_total_savings": 1250.5,
4827 | "analysis_period_hours": 24
4828 | },
4829 | "implementation_roadmap": {
4830 | "quick_wins": [],
4831 | "major_improvements": [],
4832 | "maintenance_tasks": []
4833 | },
4834 | "timestamp": "2024-01-01T12:00:00"
4835 | }
4836 | }
4837 | }
4838 | },
4839 | 500: {
4840 | "description": "Internal server error",
4841 | "content": {
4842 | "application/json": {
4843 | "example": {"detail": "Failed to generate recommendations"}
4844 | }
4845 | }
4846 | }
4847 | }
4848 | )
4849 | async def get_performance_recommendations(
4850 | hours_back: int = Query(
4851 | 24,
4852 | description="Hours back to analyze for recommendations",
4853 | ge=1,
4854 | le=720,
4855 | example=24
4856 | ),
4857 | priority_filter: str = Query(
4858 | "all",
4859 | description="Filter recommendations by priority level",
4860 | regex="^(all|high|medium|low)$",
4861 | example="all"
4862 | )
4863 | ) -> PerformanceRecommendationsResponse:
4864 | """Generate actionable performance optimization recommendations"""
4865 | try:
4866 | conn = get_db_connection()
4867 | cursor = conn.cursor()
4868 |
4869 | since_timestamp = datetime.now().timestamp() - (hours_back * 3600)
4870 |
4871 | recommendations = []
4872 |
4873 | # Analyze slow tools
4874 | cursor.execute("""
4875 | SELECT
4876 | tool_name,
4877 | COUNT(*) as usage_count,
4878 | AVG(completed_at - started_at) as avg_duration,
4879 | MAX(completed_at - started_at) as max_duration,
4880 | SUM(completed_at - started_at) as total_time
4881 | FROM actions
4882 | WHERE started_at >= ? AND completed_at IS NOT NULL
4883 | GROUP BY tool_name
4884 | HAVING avg_duration > 5
4885 | ORDER BY total_time DESC
4886 | """, (since_timestamp,))
4887 |
4888 | slow_tools = [dict(zip([d[0] for d in cursor.description], row, strict=False)) for row in cursor.fetchall()]
4889 |
4890 | for tool in slow_tools[:5]: # Top 5 slowest tools
4891 | impact_score = tool['total_time'] / 3600 # hours of time spent
4892 | priority = 'high' if impact_score > 1 else 'medium' if impact_score > 0.5 else 'low'
4893 |
4894 | recommendation = PerformanceRecommendation(
4895 | id=f"optimize_tool_{tool['tool_name']}",
4896 | type='tool_optimization',
4897 | priority=priority,
4898 | title=f"Optimize {tool['tool_name']} performance",
4899 | description=f"Tool consumes {tool['total_time']:.1f}s total execution time with {tool['avg_duration']:.2f}s average",
4900 | impact_estimate=ImpactEstimate(
4901 | time_savings_potential=tool['total_time'] * 0.3, # Assume 30% improvement possible
4902 | affected_actions=tool['usage_count'],
4903 | cost_benefit_ratio=impact_score
4904 | ),
4905 | implementation_steps=[
4906 | f"Profile {tool['tool_name']} execution to identify bottlenecks",
4907 | "Consider caching frequently used data",
4908 | "Optimize database queries if applicable",
4909 | "Evaluate alternative implementations or libraries"
4910 | ],
4911 | estimated_effort='medium',
4912 | prerequisites=['Development environment setup', 'Performance profiling tools'],
4913 | metrics_to_track=[
4914 | 'Average execution time',
4915 | 'P95 execution time',
4916 | 'Tool success rate',
4917 | 'Resource utilization'
4918 | ]
4919 | )
4920 | recommendations.append(recommendation)
4921 |
4922 | # Analyze workflow parallelization opportunities
4923 | cursor.execute("""
4924 | SELECT
4925 | workflow_id,
4926 | COUNT(*) as action_count,
4927 | SUM(completed_at - started_at) as total_sequential_time,
4928 | (MAX(completed_at) - MIN(started_at)) as actual_elapsed_time
4929 | FROM actions
4930 | WHERE started_at >= ? AND completed_at IS NOT NULL AND workflow_id IS NOT NULL
4931 | GROUP BY workflow_id
4932 | HAVING action_count > 3 AND total_sequential_time > actual_elapsed_time * 1.5
4933 | ORDER BY (total_sequential_time - actual_elapsed_time) DESC
4934 | """, (since_timestamp,))
4935 |
4936 | parallelization_opps = [dict(zip([d[0] for d in cursor.description], row, strict=False)) for row in cursor.fetchall()]
4937 |
4938 | for opp in parallelization_opps[:3]: # Top 3 parallelization opportunities
4939 | time_savings = opp['total_sequential_time'] - opp['actual_elapsed_time']
4940 | priority = 'high' if time_savings > 30 else 'medium'
4941 |
4942 | recommendation = PerformanceRecommendation(
4943 | id=f"parallelize_workflow_{opp['workflow_id']}",
4944 | type='parallelization',
4945 | priority=priority,
4946 | title=f"Parallelize workflow {opp['workflow_id']}",
4947 | description=f"Workflow could save {time_savings:.2f}s through better parallelization",
4948 | impact_estimate=ImpactEstimate(
4949 | time_savings_potential=time_savings,
4950 | efficiency_improvement=(time_savings / opp['total_sequential_time']) * 100,
4951 | affected_workflows=1,
4952 | affected_actions=opp['action_count'],
4953 | cost_benefit_ratio=time_savings / 10 # Arbitrary scaling
4954 | ),
4955 | implementation_steps=[
4956 | "Analyze action dependencies in the workflow",
4957 | "Identify independent action sequences",
4958 | "Implement async execution patterns",
4959 | "Add proper synchronization points"
4960 | ],
4961 | estimated_effort='high',
4962 | prerequisites=['Workflow dependency analysis', 'Async execution framework'],
4963 | metrics_to_track=[
4964 | 'Workflow end-to-end time',
4965 | 'Action parallelization ratio',
4966 | 'Resource utilization efficiency'
4967 | ]
4968 | )
4969 | recommendations.append(recommendation)
4970 |
4971 | # Analyze error patterns
4972 | cursor.execute("""
4973 | SELECT
4974 | tool_name,
4975 | COUNT(*) as error_count,
4976 | COUNT(*) * 100.0 / (
4977 | SELECT COUNT(*) FROM actions a2
4978 | WHERE a2.tool_name = actions.tool_name AND a2.started_at >= ?
4979 | ) as error_rate
4980 | FROM actions
4981 | WHERE started_at >= ? AND status = 'failed'
4982 | GROUP BY tool_name
4983 | HAVING error_rate > 5
4984 | ORDER BY error_rate DESC
4985 | """, (since_timestamp, since_timestamp))
4986 |
4987 | error_prone_tools = [dict(zip([d[0] for d in cursor.description], row, strict=False)) for row in cursor.fetchall()]
4988 |
4989 | for tool in error_prone_tools[:3]: # Top 3 error-prone tools
4990 | priority = 'high' if tool['error_rate'] > 20 else 'medium'
4991 |
4992 | recommendation = PerformanceRecommendation(
4993 | id=f"improve_reliability_{tool['tool_name']}",
4994 | type='reliability_improvement',
4995 | priority=priority,
4996 | title=f"Improve {tool['tool_name']} reliability",
4997 | description=f"Tool has {tool['error_rate']:.1f}% failure rate ({tool['error_count']} failures)",
4998 | impact_estimate=ImpactEstimate(
4999 | reliability_improvement=tool['error_rate'],
5000 | affected_actions=tool['error_count'],
5001 | user_experience_impact='high',
5002 | cost_benefit_ratio=tool['error_rate'] / 10,
5003 | time_savings_potential=0 # Reliability doesn't directly save time
5004 | ),
5005 | implementation_steps=[
5006 | "Analyze failure patterns and root causes",
5007 | "Implement better error handling and retries",
5008 | "Add input validation and sanitization",
5009 | "Improve tool documentation and usage examples"
5010 | ],
5011 | estimated_effort='medium',
5012 | prerequisites=['Error logging analysis', 'Tool source code access'],
5013 | metrics_to_track=[
5014 | 'Tool failure rate',
5015 | 'Time to recovery',
5016 | 'User satisfaction scores'
5017 | ]
5018 | )
5019 | recommendations.append(recommendation)
5020 |
5021 | # Filter recommendations by priority if requested
5022 | if priority_filter != 'all':
5023 | recommendations = [r for r in recommendations if r.priority == priority_filter]
5024 |
5025 | # Sort by impact and priority
5026 | priority_order = {'high': 3, 'medium': 2, 'low': 1}
5027 | recommendations.sort(key=lambda x: (
5028 | priority_order.get(x.priority, 0),
5029 | x.impact_estimate.time_savings_potential
5030 | ), reverse=True)
5031 |
5032 | # Calculate summary
5033 | summary = RecommendationSummary(
5034 | total_recommendations=len(recommendations),
5035 | high_priority=len([r for r in recommendations if r.priority == 'high']),
5036 | medium_priority=len([r for r in recommendations if r.priority == 'medium']),
5037 | low_priority=len([r for r in recommendations if r.priority == 'low']),
5038 | estimated_total_savings=sum(r.impact_estimate.time_savings_potential for r in recommendations),
5039 | analysis_period_hours=hours_back
5040 | )
5041 |
5042 | # Create implementation roadmap
5043 | roadmap = ImplementationRoadmap(
5044 | quick_wins=[r for r in recommendations if r.estimated_effort == 'low' and r.priority == 'high'],
5045 | major_improvements=[r for r in recommendations if r.estimated_effort == 'high' and r.priority == 'high'],
5046 | maintenance_tasks=[r for r in recommendations if r.priority == 'low']
5047 | )
5048 |
5049 | conn.close()
5050 |
5051 | return PerformanceRecommendationsResponse(
5052 | recommendations=recommendations,
5053 | summary=summary,
5054 | implementation_roadmap=roadmap,
5055 | timestamp=datetime.now().isoformat()
5056 | )
5057 |
5058 | except sqlite3.Error as e:
5059 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
5060 | except Exception as e:
5061 | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
5062 |
5063 | # ---------- Module-level singletons for Body parameters ----------
5064 |
5065 | # Define Body parameters as module-level singletons to avoid B008 warnings
5066 | WORKFLOW_SCHEDULE_BODY = Body(...)
5067 | RESTORE_STATE_BODY = Body(...)
5068 |
5069 | # ---------- Workflow Management Pydantic Models ----------
5070 |
5071 | class WorkflowScheduleRequest(BaseModel):
5072 | """Request model for scheduling a workflow"""
5073 | scheduled_at: datetime = Field(
5074 | ...,
5075 | description="ISO timestamp for when to execute the workflow",
5076 | example="2024-01-01T12:00:00Z"
5077 | )
5078 | priority: int = Field(
5079 | default=5,
5080 | ge=1,
5081 | le=10,
5082 | description="Execution priority (1=highest, 10=lowest)",
5083 | example=3
5084 | )
5085 |
5086 | class ScheduleData(BaseModel):
5087 | """Schedule data for the workflow"""
5088 | workflow_id: str = Field(..., description="ID of the scheduled workflow")
5089 | scheduled_at: str = Field(..., description="Scheduled execution time")
5090 | priority: int = Field(..., description="Execution priority")
5091 | status: str = Field(..., description="Schedule status")
5092 | created_at: str = Field(..., description="When the schedule was created")
5093 |
5094 | class WorkflowScheduleResponse(BaseModel):
5095 | """Response model for workflow scheduling"""
5096 | success: bool = Field(..., description="Whether scheduling was successful")
5097 | schedule_id: str = Field(..., description="Unique identifier for this schedule")
5098 | message: str = Field(..., description="Success or error message")
5099 | schedule_data: ScheduleData = Field(..., description="Details of the created schedule")
5100 |
5101 | # ---------- Cognitive State Restoration Models ----------
5102 |
5103 | class RestoreStateRequest(BaseModel):
5104 | """Request model for restoring a cognitive state"""
5105 | restore_mode: str = Field(
5106 | default="full",
5107 | regex="^(full|partial|snapshot)$",
5108 | description="Type of restoration to perform",
5109 | example="full"
5110 | )
5111 |
5112 | class RestoreData(BaseModel):
5113 | """Restoration data"""
5114 | state_id: str = Field(..., description="ID of the state being restored")
5115 | restore_mode: str = Field(..., description="Restoration mode used")
5116 | restored_at: str = Field(..., description="When the restoration occurred")
5117 | original_timestamp: Optional[float] = Field(None, description="Original state timestamp")
5118 |
5119 | class RestoreStateResponse(BaseModel):
5120 | """Response model for state restoration"""
5121 | success: bool = Field(..., description="Whether restoration was successful")
5122 | message: str = Field(..., description="Success or error message")
5123 | restore_data: RestoreData = Field(..., description="Details of the restoration")
5124 |
5125 | # ---------- Health Check Models ----------
5126 |
5127 | class HealthResponse(BaseModel):
5128 | """Health check response"""
5129 | status: str = Field(..., description="Health status indicator", example="ok")
5130 | version: str = Field(..., description="Server version string", example="0.1.0")
5131 |
5132 | # ---------- Workflow Management Endpoints ----------
5133 |
5134 | @app.post(
5135 | "/workflows/{workflow_id}/schedule",
5136 | response_model=WorkflowScheduleResponse,
5137 | tags=["Workflow Management"],
5138 | summary="Schedule workflow execution",
5139 | description="""
5140 | Schedule a workflow for future execution with configurable priority and timing:
5141 |
5142 | - **Workflow scheduling** with specific timing
5143 | - **Priority management** for execution order
5144 | - **Status tracking** for scheduled workflows
5145 | - **Integration** with workflow execution system
5146 |
5147 | Essential for orchestrating complex multi-step processes and time-based automation.
5148 | """,
5149 | responses={
5150 | 200: {
5151 | "description": "Workflow scheduled successfully",
5152 | "content": {
5153 | "application/json": {
5154 | "example": {
5155 | "success": True,
5156 | "schedule_id": "sched_workflow_123_1704067200",
5157 | "message": "Workflow scheduled successfully",
5158 | "schedule_data": {
5159 | "workflow_id": "workflow_123",
5160 | "scheduled_at": "2024-01-01T12:00:00Z",
5161 | "priority": 3,
5162 | "status": "scheduled",
5163 | "created_at": "2024-01-01T10:00:00Z"
5164 | }
5165 | }
5166 | }
5167 | }
5168 | },
5169 | 400: {"description": "Invalid request parameters"},
5170 | 404: {"description": "Workflow not found"},
5171 | 500: {"description": "Internal server error"}
5172 | }
5173 | )
5174 | async def schedule_workflow(
5175 | workflow_id: str = ApiPath(..., description="Unique identifier of the workflow to schedule", example="workflow_abc123", regex="^[a-zA-Z0-9_-]+$"),
5176 | request: WorkflowScheduleRequest = WORKFLOW_SCHEDULE_BODY
5177 | ) -> WorkflowScheduleResponse:
5178 | """Schedule workflow execution"""
5179 | try:
5180 | # This is a placeholder implementation
5181 | # In a real system, this would integrate with a task scheduler
5182 | schedule_data = ScheduleData(
5183 | workflow_id=workflow_id,
5184 | scheduled_at=request.scheduled_at.isoformat(),
5185 | priority=request.priority,
5186 | status="scheduled",
5187 | created_at=datetime.now().isoformat()
5188 | )
5189 |
5190 | # Generate a unique schedule ID
5191 | schedule_id = f"sched_{workflow_id}_{int(datetime.now().timestamp())}"
5192 |
5193 | # In a real implementation, this would:
5194 | # 1. Verify the workflow exists
5195 | # 2. Create a scheduled task in a task queue
5196 | # 3. Store the schedule in a database
5197 |
5198 | return WorkflowScheduleResponse(
5199 | success=True,
5200 | schedule_id=schedule_id,
5201 | message="Workflow scheduled successfully",
5202 | schedule_data=schedule_data
5203 | )
5204 |
5205 | except Exception as e:
5206 | raise HTTPException(status_code=500, detail=f"Failed to schedule workflow: {str(e)}") from e
5207 |
5208 | # ---------- Cognitive State Restoration Endpoint ----------
5209 |
5210 | @app.post(
5211 | "/cognitive-states/{state_id}/restore",
5212 | response_model=RestoreStateResponse,
5213 | tags=["Cognitive States"],
5214 | summary="Restore a previous cognitive state",
5215 | description="""
5216 | Restore the system to a previous cognitive state for analysis or recovery:
5217 |
5218 | - **State restoration** with configurable restore modes
5219 | - **Temporal analysis** by reverting to specific points in time
5220 | - **Recovery mechanisms** for problematic state transitions
5221 | - **Research capabilities** for understanding state evolution
5222 |
5223 | Critical for debugging cognitive state issues and temporal analysis of system behavior.
5224 | """,
5225 | responses={
5226 | 200: {
5227 | "description": "Cognitive state restoration initiated",
5228 | "content": {
5229 | "application/json": {
5230 | "example": {
5231 | "success": True,
5232 | "message": "Cognitive state restoration initiated",
5233 | "restore_data": {
5234 | "state_id": "state_abc123xyz789",
5235 | "restore_mode": "full",
5236 | "restored_at": "2024-01-01T12:00:00Z",
5237 | "original_timestamp": 1703980800.0
5238 | }
5239 | }
5240 | }
5241 | }
5242 | },
5243 | 400: {"description": "Invalid request parameters"},
5244 | 404: {"description": "Cognitive state not found"},
5245 | 500: {"description": "Internal server error"}
5246 | }
5247 | )
5248 | async def restore_cognitive_state(
5249 | state_id: str = ApiPath(
5250 | ...,
5251 | description="Unique identifier of the cognitive state to restore",
5252 | example="state_abc123xyz789",
5253 | regex="^[a-zA-Z0-9_-]+$"
5254 | ),
5255 | request: RestoreStateRequest = RESTORE_STATE_BODY
5256 | ) -> RestoreStateResponse:
5257 | """Restore a cognitive state"""
5258 | try:
5259 | conn = get_db_connection()
5260 | cursor = conn.cursor()
5261 |
5262 | # Get the state to restore
5263 | cursor.execute("SELECT * FROM cognitive_timeline_states WHERE state_id = ?", (state_id,))
5264 | state = cursor.fetchone()
5265 |
5266 | if not state:
5267 | raise HTTPException(
5268 | status_code=404,
5269 | detail=f"Cognitive state with ID '{state_id}' not found"
5270 | )
5271 |
5272 | # Create restoration data
5273 | restore_data = RestoreData(
5274 | state_id=state_id,
5275 | restore_mode=request.restore_mode,
5276 | restored_at=datetime.now().isoformat(),
5277 | original_timestamp=state[1] if state else None # timestamp column
5278 | )
5279 |
5280 | # In a real implementation, this would:
5281 | # 1. Create a backup of the current state
5282 | # 2. Restore the cognitive state to the system
5283 | # 3. Update all dependent systems
5284 | # 4. Log the restoration event
5285 |
5286 | conn.close()
5287 |
5288 | return RestoreStateResponse(
5289 | success=True,
5290 | message="Cognitive state restoration initiated",
5291 | restore_data=restore_data
5292 | )
5293 |
5294 | except HTTPException:
5295 | raise
5296 | except sqlite3.Error as e:
5297 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
5298 | except Exception as e:
5299 | raise HTTPException(status_code=500, detail=f"Failed to restore state: {str(e)}") from e
5300 |
5301 | # ---------- Artifact Download Endpoint ----------
5302 |
5303 | @app.get(
5304 | "/artifacts/{artifact_id}/download",
5305 | tags=["Artifacts"],
5306 | summary="Download artifact file or data",
5307 | description="""
5308 | Download the raw file or data associated with an artifact:
5309 |
5310 | - **File download** with proper content types
5311 | - **Metadata preservation** in download headers
5312 | - **Access logging** for audit trails
5313 | - **Format handling** for different artifact types
5314 |
5315 | Essential for accessing artifact content outside the UMS Explorer interface.
5316 | """,
5317 | responses={
5318 | 200: {
5319 | "description": "Artifact file downloaded successfully",
5320 | "content": {
5321 | "application/octet-stream": {
5322 | "schema": {"type": "string", "format": "binary"}
5323 | },
5324 | "application/json": {
5325 | "schema": {"type": "object"},
5326 | "example": {
5327 | "artifact_id": "artifact_123",
5328 | "name": "analysis_report",
5329 | "artifact_type": "document",
5330 | "description": "Quarterly analysis report",
5331 | "file_path": "/artifacts/reports/q4_2024.pdf",
5332 | "file_size": 2048576,
5333 | "created_at": 1703980800.0,
5334 | "metadata": {"author": "System", "version": "1.0"}
5335 | }
5336 | }
5337 | }
5338 | },
5339 | 404: {
5340 | "description": "Artifact not found",
5341 | "content": {
5342 | "application/json": {
5343 | "example": {"detail": "Artifact with ID 'artifact_123' not found"}
5344 | }
5345 | }
5346 | },
5347 | 500: {"description": "Internal server error"}
5348 | }
5349 | )
5350 | async def download_artifact(
5351 | artifact_id: str = ApiPath(
5352 | ...,
5353 | description="Unique identifier of the artifact to download",
5354 | example="artifact_abc123",
5355 | regex="^[a-zA-Z0-9_-]+$"
5356 | )
5357 | ):
5358 | """Download an artifact"""
5359 | try:
5360 | conn = get_db_connection()
5361 | cursor = conn.cursor()
5362 |
5363 | cursor.execute("SELECT * FROM artifacts WHERE artifact_id = ?", (artifact_id,))
5364 | artifact = cursor.fetchone()
5365 |
5366 | if not artifact:
5367 | raise HTTPException(
5368 | status_code=404,
5369 | detail=f"Artifact with ID '{artifact_id}' not found"
5370 | )
5371 |
5372 | # Convert to dictionary
5373 | artifact_dict = dict(zip([d[0] for d in cursor.description], artifact, strict=False))
5374 |
5375 | conn.close()
5376 |
5377 | # For now, return the artifact data as JSON
5378 | # In a real implementation, this would serve the actual file
5379 | from fastapi.responses import Response
5380 |
5381 | content = json.dumps(artifact_dict, indent=2)
5382 | filename = f"{artifact_dict.get('name', 'artifact')}.json"
5383 |
5384 | return Response(
5385 | content=content,
5386 | media_type='application/json',
5387 | headers={
5388 | 'Content-Disposition': f'attachment; filename="{filename}"'
5389 | }
5390 | )
5391 |
5392 | except HTTPException:
5393 | raise
5394 | except sqlite3.Error as e:
5395 | raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") from e
5396 | except Exception as e:
5397 | raise HTTPException(status_code=500, detail=f"Failed to download artifact: {str(e)}") from e
5398 |
5399 | # ---------- Health & Utilities Endpoints ----------
5400 |
5401 | @app.get(
5402 | "/health",
5403 | response_model=HealthResponse,
5404 | tags=["Health & Utilities"],
5405 | summary="Health check endpoint",
5406 | description="""
5407 | Check the health and operational status of the Ultimate MCP Server:
5408 |
5409 | - **Server status** verification
5410 | - **Service availability** confirmation
5411 | - **Version information** for compatibility checks
5412 | - **Load balancer integration** support
5413 |
5414 | Standard health check endpoint for monitoring systems and operational dashboards.
5415 | """,
5416 | responses={
5417 | 200: {
5418 | "description": "Server is healthy and operational",
5419 | "content": {
5420 | "application/json": {
5421 | "example": {
5422 | "status": "ok",
5423 | "version": "0.1.0"
5424 | }
5425 | }
5426 | }
5427 | },
5428 | 500: {
5429 | "description": "Server health check failed",
5430 | "content": {
5431 | "application/json": {
5432 | "example": {"detail": "Health check failed"}
5433 | }
5434 | }
5435 | }
5436 | }
5437 | )
5438 | async def health_check() -> HealthResponse:
5439 | """Health check endpoint for monitoring server status"""
5440 | return HealthResponse(
5441 | status="ok",
5442 | version="0.1.0"
5443 | )
5444 |
```