This is page 1 of 2. Use http://codebase.md/fradser/mcp-server-mas-sequential-thinking?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .env.example ├── .gitignore ├── .python-version ├── CHANGELOG.md ├── CLAUDE.md ├── Dockerfile ├── pyproject.toml ├── README.md ├── README.zh-CN.md ├── smithery.yaml ├── src │ └── mcp_server_mas_sequential_thinking │ ├── __init__.py │ ├── config │ │ ├── __init__.py │ │ ├── constants.py │ │ └── modernized_config.py │ ├── core │ │ ├── __init__.py │ │ ├── models.py │ │ ├── session.py │ │ └── types.py │ ├── infrastructure │ │ ├── __init__.py │ │ ├── logging_config.py │ │ └── persistent_memory.py │ ├── main.py │ ├── processors │ │ ├── __init__.py │ │ ├── multi_thinking_core.py │ │ └── multi_thinking_processor.py │ ├── routing │ │ ├── __init__.py │ │ ├── agno_workflow_router.py │ │ ├── ai_complexity_analyzer.py │ │ ├── complexity_types.py │ │ └── multi_thinking_router.py │ ├── services │ │ ├── __init__.py │ │ ├── context_builder.py │ │ ├── processing_orchestrator.py │ │ ├── response_formatter.py │ │ ├── response_processor.py │ │ ├── retry_handler.py │ │ ├── server_core.py │ │ ├── thought_processor_refactored.py │ │ └── workflow_executor.py │ └── utils │ ├── __init__.py │ └── utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv .env # Testing .hypothesis/ ``` -------------------------------------------------------------------------------- /.env.example: -------------------------------------------------------------------------------- ``` # --- LLM Configuration --- # Select the LLM provider: "deepseek" (default), "groq", "openrouter", "github", or "ollama" LLM_PROVIDER="deepseek" # Provide the API key for the chosen provider: # GROQ_API_KEY="your_groq_api_key" DEEPSEEK_API_KEY="your_deepseek_api_key" # OPENROUTER_API_KEY="your_openrouter_api_key" # GITHUB_TOKEN="ghp_your_github_personal_access_token" # Note: Ollama requires no API key but needs local installation # Note: GitHub Models requires a GitHub Personal Access Token with appropriate scopes # Optional: Base URL override (e.g., for custom endpoints) LLM_BASE_URL="your_base_url_if_needed" # Optional: Specify different models for Enhanced (Complex Synthesis) and Standard (Individual Processing) # Defaults are set within the code based on the provider if these are not set. # Example for Groq: # GROQ_ENHANCED_MODEL_ID="openai/gpt-oss-120b" # For complex synthesis # GROQ_STANDARD_MODEL_ID="openai/gpt-oss-20b" # For individual processing # Example for DeepSeek: # DEEPSEEK_ENHANCED_MODEL_ID="deepseek-chat" # For complex synthesis # DEEPSEEK_STANDARD_MODEL_ID="deepseek-chat" # For individual processing # Example for GitHub Models: # GITHUB_ENHANCED_MODEL_ID="openai/gpt-5" # For complex synthesis # GITHUB_STANDARD_MODEL_ID="openai/gpt-5-min" # For individual processing # Example for OpenRouter: # OPENROUTER_ENHANCED_MODEL_ID="deepseek/deepseek-chat-v3-0324" # For synthesis # OPENROUTER_STANDARD_MODEL_ID="deepseek/deepseek-r1" # For processing # Example for Ollama: # OLLAMA_ENHANCED_MODEL_ID="devstral:24b" # For complex synthesis # OLLAMA_STANDARD_MODEL_ID="devstral:24b" # For individual processing # --- Enhanced Agno 1.8+ Features --- # Enable enhanced agents with advanced reasoning, memory, and structured outputs USE_ENHANCED_AGENTS="true" # Team mode: "standard", "enhanced", "hybrid", or "multi_thinking" # standard: Traditional agent setup (backward compatible) # enhanced: Use new Agno 1.8+ features (memory, reasoning, structured outputs) # hybrid: Mix of standard and enhanced agents for optimal performance # multi_thinking: Use Multi-Thinking methodology for balanced thinking (recommended) TEAM_MODE="standard" # --- External Tools --- # Required ONLY if web research capabilities are needed for thinking agents EXA_API_KEY="your_exa_api_key" # --- Adaptive Routing & Cost Optimization --- # Enable adaptive routing (automatically selects single vs multi-agent based on complexity) ENABLE_ADAPTIVE_ROUTING="true" # Multi-Thinking intelligent routing ENABLE_MULTI_THINKING="true" # Enable Multi-Thinking methodology for enhanced thinking MULTI_THINKING_COMPLEXITY_THRESHOLD="5.0" # Minimum complexity to trigger Multi-Thinking processing # Cost optimization settings DAILY_BUDGET_LIMIT="" # e.g., "5.0" for $5 daily limit MONTHLY_BUDGET_LIMIT="" # e.g., "50.0" for $50 monthly limit PER_THOUGHT_BUDGET_LIMIT="" # e.g., "0.10" for $0.10 per thought limit QUALITY_THRESHOLD="0.7" # Minimum quality score (0.0-1.0) # Response optimization settings RESPONSE_STYLE="practical" # "practical", "academic", or "balanced" MAX_RESPONSE_LENGTH="800" # Maximum response length in words ENFORCE_SIMPLICITY="true" # Remove excessive academic complexity # Provider cost overrides (cost per 1K tokens) # DEEPSEEK_COST_PER_1K_TOKENS="0.0002" # GROQ_COST_PER_1K_TOKENS="0.0" # GITHUB_COST_PER_1K_TOKENS="0.0005" # OPENROUTER_COST_PER_1K_TOKENS="0.001" # OLLAMA_COST_PER_1K_TOKENS="0.0" # --- Persistent Memory --- # Database URL for persistent storage (defaults to local SQLite) DATABASE_URL="" # e.g., "postgresql://user:pass@localhost/dbname" or leave empty for SQLite # Memory management MEMORY_PRUNING_DAYS="30" # Prune sessions older than X days MEMORY_KEEP_RECENT="100" # Always keep X most recent sessions # Note: Logs are stored in ~/.mas_sequential_thinking/logs/ directory # The log file is named mas_sequential_thinking.log with rotation # --- Logging and Performance --- # Core logging configuration LOG_LEVEL="INFO" # "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" LOG_FORMAT="text" # "text" (readable) or "json" (structured) LOG_TARGETS="file,console" # Comma-separated: "file", "console" LOG_FILE_MAX_SIZE="10MB" # Maximum log file size (supports KB, MB, GB) LOG_FILE_BACKUP_COUNT="5" # Number of backup log files to keep LOG_SAMPLING_RATE="1.0" # Log sampling rate (0.0-1.0, 1.0 = all logs) # Smart logging configuration (reduces log verbosity while maintaining insights) SMART_LOGGING="true" # Enable intelligent logging with adaptive verbosity SMART_LOG_LEVEL="performance" # "critical", "performance", "routing", "debug" LOG_PERFORMANCE_ISSUES="true" # Always log slow/expensive processing LOG_RESPONSE_QUALITY="true" # Log overly complex or academic responses # Performance and error handling MAX_RETRIES="3" TIMEOUT="30.0" PERFORMANCE_MONITORING="true" # Enable real-time performance monitoring PERFORMANCE_BASELINE_TIME="30.0" # Baseline time per thought in seconds PERFORMANCE_BASELINE_EFFICIENCY="0.8" # Target efficiency score ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Sequential Thinking Multi-Agent System (MAS)  [](https://smithery.ai/server/@FradSer/mcp-server-mas-sequential-thinking) [](https://twitter.com/FradSer) [](https://www.python.org/downloads/) [](https://github.com/cognitivecomputations/agno) English | [简体中文](README.zh-CN.md) This project implements an advanced sequential thinking process using a **Multi-Agent System (MAS)** built with the **Agno** framework and served via **MCP**. It represents a significant evolution from simpler state-tracking approaches by leveraging coordinated, specialized agents for deeper analysis and problem decomposition. [](https://mseep.ai/app/fradser-mcp-server-mas-sequential-thinking) ## What is This? This is an **MCP server** - not a standalone application. It runs as a background service that extends your LLM client (like Claude Desktop) with sophisticated sequential thinking capabilities. The server provides a `sequentialthinking` tool that processes thoughts through multiple specialized AI agents, each examining the problem from a different cognitive angle. ## Core Architecture: Multi-Dimensional Thinking Agents The system employs **6 specialized thinking agents**, each focused on a distinct cognitive perspective: ### 1. **Factual Agent** - **Focus**: Objective facts and verified data - **Approach**: Analytical, evidence-based reasoning - **Capabilities**: - Web research for current facts (via ExaTools) - Data verification and source citation - Information gap identification - **Time allocation**: 120 seconds for thorough analysis ### 2. **Emotional Agent** - **Focus**: Intuition and emotional intelligence - **Approach**: Gut reactions and feelings - **Capabilities**: - Quick intuitive responses (30-second snapshots) - Visceral reactions without justification - Emotional pattern recognition - **Time allocation**: 30 seconds (quick reaction mode) ### 3. **Critical Agent** - **Focus**: Risk assessment and problem identification - **Approach**: Logical scrutiny and devil's advocate - **Capabilities**: - Research counterexamples and failures (via ExaTools) - Identify logical flaws and risks - Challenge assumptions constructively - **Time allocation**: 120 seconds for deep analysis ### 4. **Optimistic Agent** - **Focus**: Benefits, opportunities, and value - **Approach**: Positive exploration with realistic grounding - **Capabilities**: - Research success stories (via ExaTools) - Identify feasible opportunities - Explore best-case scenarios logically - **Time allocation**: 120 seconds for balanced optimism ### 5. **Creative Agent** - **Focus**: Innovation and alternative solutions - **Approach**: Lateral thinking and idea generation - **Capabilities**: - Cross-industry innovation research (via ExaTools) - Divergent thinking techniques - Multiple solution generation - **Time allocation**: 240 seconds (creativity needs time) ### 6. **Synthesis Agent** - **Focus**: Integration and metacognitive orchestration - **Approach**: Holistic synthesis and final answer generation - **Capabilities**: - Integrate all perspectives into coherent response - Answer the original question directly - Provide actionable, user-friendly insights - **Time allocation**: 60 seconds for synthesis - **Note**: Uses enhanced model, does NOT include ExaTools (focuses on integration) ## AI-Powered Intelligent Routing The system uses **AI-driven complexity analysis** to determine the optimal thinking sequence: ### Processing Strategies: 1. **Single Agent** (Simple questions) - Direct factual or emotional response - Fastest processing for straightforward queries 2. **Double Agent** (Moderate complexity) - Two-step sequences (e.g., Optimistic → Critical) - Balanced perspectives for evaluation tasks 3. **Triple Agent** (Core thinking) - Factual → Creative → Synthesis - Philosophical and analytical problems 4. **Full Sequence** (Complex problems) - All 6 agents orchestrated together - Comprehensive multi-perspective analysis The AI analyzer evaluates: - Problem complexity and semantic depth - Primary problem type (factual, emotional, creative, philosophical, etc.) - Required thinking modes for optimal solution - Appropriate model selection (Enhanced vs Standard) ### AI Routing Flow Diagram ```mermaid flowchart TD A[Input Thought] --> B[AI Complexity Analyzer] B --> C{Problem Analysis} C --> C1[Complexity Score<br/>0-100] C --> C2[Problem Type<br/>FACTUAL/EMOTIONAL/<br/>CREATIVE/PHILOSOPHICAL] C --> C3[Required Thinking Modes] C1 --> D{Routing Decision} C2 --> D C3 --> D D -->|Score: 0-25<br/>Simple| E1[Single Agent Strategy] D -->|Score: 26-50<br/>Moderate| E2[Double Agent Strategy] D -->|Score: 51-75<br/>Complex| E3[Triple Agent Strategy] D -->|Score: 76-100<br/>Highly Complex| E4[Full Sequence Strategy] %% Single Agent Flow E1 --> F1[Factual Agent<br/>120s + ExaTools] F1 --> G1[Direct Response] %% Double Agent Flow (Full Parallel) E2 --> DA1[Both Agents Run in Parallel] DA1 --> DA2["Agent 1 e.g. Optimistic<br/>120s + ExaTools"] DA1 --> DA3["Agent 2 e.g. Critical<br/>120s + ExaTools"] DA2 --> G2[Programmatic Synthesis<br/>Combines both parallel results] DA3 --> G2 %% Triple Agent Flow (Full Parallel) E3 --> TA1[All 3 Agents Run in Parallel] TA1 --> TA2[Factual Agent<br/>120s + ExaTools] TA1 --> TA3[Creative Agent<br/>240s + ExaTools] TA1 --> TA4[Critical Agent<br/>120s + ExaTools] TA2 --> G3[Programmatic Synthesis<br/>Integrates all 3 results] TA3 --> G3 TA4 --> G3 %% Full Sequence Flow (3-Step Process) E4 --> FS1[Step 1: Initial Synthesis<br/>60s Enhanced Model<br/>Initial orchestration] FS1 --> FS2[Step 2: Parallel Execution<br/>5 Agents Run Simultaneously] FS2 --> FS2A[Factual Agent<br/>120s + ExaTools] FS2 --> FS2B[Emotional Agent<br/>30s Quick Response] FS2 --> FS2C[Optimistic Agent<br/>120s + ExaTools] FS2 --> FS2D[Critical Agent<br/>120s + ExaTools] FS2 --> FS2E[Creative Agent<br/>240s + ExaTools] FS2A --> FS3[Step 3: Final Synthesis<br/>60s Enhanced Model<br/>Integrates all parallel results] FS2B --> FS3 FS2C --> FS3 FS2D --> FS3 FS2E --> FS3 FS3 --> G4[Final Synthesis Output<br/>Comprehensive integrated result] G1 --> H[Next Iteration or<br/>Final Answer] G2 --> H G3 --> H G4 --> H style A fill:#e1f5fe style B fill:#f3e5f5 style C fill:#fff3e0 style D fill:#e8f5e8 style TA1 fill:#ffecb3 style FS2 fill:#ffecb3 style G1 fill:#fce4ec style G2 fill:#fce4ec style G3 fill:#fce4ec style G4 fill:#fce4ec style H fill:#f1f8e9 ``` **Key Insights:** - **Parallel Execution**: Non-synthesis agents run simultaneously for maximum efficiency - **Synthesis Integration**: Synthesis agents process parallel results sequentially - **Two Processing Types**: - **Synthesis Agent**: Real AI agent using Enhanced Model for integration - **Programmatic Synthesis**: Code-based combination when no Synthesis Agent - **Performance**: Parallel processing optimizes both speed and quality ## Research Capabilities (ExaTools Integration) **4 out of 6 agents** are equipped with web research capabilities via ExaTools: - **Factual Agent**: Search for current facts, statistics, verified data - **Critical Agent**: Find counterexamples, failed cases, regulatory issues - **Optimistic Agent**: Research success stories, positive case studies - **Creative Agent**: Discover innovations across different industries - **Emotional & Synthesis Agents**: No ExaTools (focused on internal processing) Research is **optional** - requires `EXA_API_KEY` environment variable. The system works perfectly without it, using pure reasoning capabilities. ## Model Intelligence ### Dual Model Strategy: - **Enhanced Model**: Used for Synthesis agent (complex integration tasks) - **Standard Model**: Used for individual thinking agents - **AI Selection**: System automatically chooses the right model based on task complexity ### Supported Providers: - **DeepSeek** (default) - High performance, cost-effective - **Groq** - Ultra-fast inference - **OpenRouter** - Access to multiple models - **GitHub Models** - OpenAI models via GitHub API - **Anthropic** - Claude models with prompt caching - **Ollama** - Local model execution ## Key Differences from Original Version (TypeScript) This Python/Agno implementation marks a fundamental shift from the original TypeScript version: | Feature/Aspect | Python/Agno Version (Current) | TypeScript Version (Original) | | :------------------ | :------------------------------------------------------------------- | :--------------------------------------------------- | | **Architecture** | **Multi-Agent System (MAS)**; Active processing by a team of agents. | **Single Class State Tracker**; Simple logging/storing. | | **Intelligence** | **Distributed Agent Logic**; Embedded in specialized agents & Coordinator. | **External LLM Only**; No internal intelligence. | | **Processing** | **Active Analysis & Synthesis**; Agents *act* on the thought. | **Passive Logging**; Merely recorded the thought. | | **Frameworks** | **Agno (MAS) + FastMCP (Server)**; Uses dedicated MAS library. | **MCP SDK only**. | | **Coordination** | **Explicit Team Coordination Logic** (`Team` in `coordinate` mode). | **None**; No coordination concept. | | **Validation** | **Pydantic Schema Validation**; Robust data validation. | **Basic Type Checks**; Less reliable. | | **External Tools** | **Integrated (Exa via Researcher)**; Can perform research tasks. | **None**. | | **Logging** | **Structured Python Logging (File + Console)**; Configurable. | **Console Logging with Chalk**; Basic. | | **Language & Ecosystem** | **Python**; Leverages Python AI/ML ecosystem. | **TypeScript/Node.js**. | In essence, the system evolved from a passive thought *recorder* to an active thought *processor* powered by a collaborative team of AI agents. ## How it Works (Multi-Dimensional Processing) 1. **Initiation:** An external LLM uses the `sequentialthinking` tool to define the problem and initiate the process. 2. **Tool Call:** The LLM calls the `sequentialthinking` tool with the current thought, structured according to the `ThoughtData` model. 3. **AI Complexity Analysis:** The system uses AI-powered analysis to determine the optimal thinking sequence based on problem complexity and type. 4. **Agent Routing:** Based on the analysis, the system routes the thought to the appropriate thinking agents (single, double, triple, or full sequence). 5. **Parallel Processing:** Multiple thinking agents process the thought simultaneously from their specialized perspectives: - Factual agents gather objective data (with optional web research) - Critical agents identify risks and problems - Optimistic agents explore opportunities and benefits - Creative agents generate innovative solutions - Emotional agents provide intuitive insights 6. **Research Integration:** Agents equipped with ExaTools conduct targeted web research to enhance their analysis. 7. **Synthesis & Integration:** The Synthesis agent integrates all perspectives into a coherent, actionable response using enhanced models. 8. **Response Generation:** The system returns a comprehensive analysis with guidance for next steps. 9. **Iteration:** The calling LLM uses the synthesized response to formulate the next thinking step or conclude the process. ## Token Consumption Warning **High Token Usage:** Due to the Multi-Agent System architecture, this tool consumes significantly **more tokens** than single-agent alternatives or the previous TypeScript version. Each `sequentialthinking` call invokes multiple specialized agents simultaneously, leading to substantially higher token usage (potentially 5-10x more than simple approaches). This parallel processing leads to substantially higher token usage (potentially 5-10x more) compared to simpler sequential approaches, but provides correspondingly deeper and more comprehensive analysis. ## MCP Tool: `sequentialthinking` The server exposes a single MCP tool that processes sequential thoughts: ### Parameters: ```typescript { thought: string, // Current thinking step content thoughtNumber: number, // Sequence number (≥1) totalThoughts: number, // Estimated total steps nextThoughtNeeded: boolean, // Is another step required? isRevision: boolean, // Revising previous thought? branchFromThought?: number, // Branch point (for exploration) branchId?: string, // Branch identifier needsMoreThoughts: boolean // Need to extend sequence? } ``` ### Response: Returns synthesized analysis from the multi-agent system with: - Processed thought analysis - Guidance for next steps - Branch and revision tracking - Status and metadata ## Installation ### Prerequisites - Python 3.10+ - LLM API access (choose one): - **DeepSeek**: `DEEPSEEK_API_KEY` (default, recommended) - **Groq**: `GROQ_API_KEY` - **OpenRouter**: `OPENROUTER_API_KEY` - **GitHub Models**: `GITHUB_TOKEN` - **Anthropic**: `ANTHROPIC_API_KEY` - **Ollama**: Local installation (no API key) - **Optional**: `EXA_API_KEY` for web research capabilities - `uv` package manager (recommended) or `pip` ### Quick Start #### 1. Install via Smithery (Recommended) ```bash npx -y @smithery/cli install @FradSer/mcp-server-mas-sequential-thinking --client claude ``` #### 2. Manual Installation ```bash # Clone the repository git clone https://github.com/FradSer/mcp-server-mas-sequential-thinking.git cd mcp-server-mas-sequential-thinking # Install with uv (recommended) uv pip install . # Or with pip pip install . ``` ### Configuration #### For MCP Clients (Claude Desktop, etc.) Add to your MCP client configuration: ```json { "mcpServers": { "sequential-thinking": { "command": "mcp-server-mas-sequential-thinking", "env": { "LLM_PROVIDER": "deepseek", "DEEPSEEK_API_KEY": "your_api_key", "EXA_API_KEY": "your_exa_key_optional" } } } } ``` #### Environment Variables Create a `.env` file or set these variables: ```bash # LLM Provider (required) LLM_PROVIDER="deepseek" # deepseek, groq, openrouter, github, anthropic, ollama DEEPSEEK_API_KEY="sk-..." # Optional: Enhanced/Standard Model Selection # DEEPSEEK_ENHANCED_MODEL_ID="deepseek-chat" # For synthesis # DEEPSEEK_STANDARD_MODEL_ID="deepseek-chat" # For other agents # Optional: Web Research (enables ExaTools) # EXA_API_KEY="your_exa_api_key" # Optional: Custom endpoint # LLM_BASE_URL="https://custom-endpoint.com" ``` ### Model Configuration Examples ```bash # Groq with different models GROQ_ENHANCED_MODEL_ID="openai/gpt-oss-120b" GROQ_STANDARD_MODEL_ID="openai/gpt-oss-20b" # Anthropic with Claude models ANTHROPIC_ENHANCED_MODEL_ID="claude-3-5-sonnet-20241022" ANTHROPIC_STANDARD_MODEL_ID="claude-3-5-haiku-20241022" # GitHub Models GITHUB_ENHANCED_MODEL_ID="gpt-4o" GITHUB_STANDARD_MODEL_ID="gpt-4o-mini" ``` ## Usage ### As MCP Server Once installed and configured in your MCP client: 1. The `sequentialthinking` tool becomes available 2. Your LLM can use it to process complex thoughts 3. The system automatically routes to appropriate thinking agents 4. Results are synthesized and returned to your LLM ### Direct Execution Run the server manually for testing: ```bash # Using installed script mcp-server-mas-sequential-thinking # Using uv uv run mcp-server-mas-sequential-thinking # Using Python python src/mcp_server_mas_sequential_thinking/main.py ``` ## Development ### Setup ```bash # Clone repository git clone https://github.com/FradSer/mcp-server-mas-sequential-thinking.git cd mcp-server-mas-sequential-thinking # Create virtual environment python -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate # Install with dev dependencies uv pip install -e ".[dev]" ``` ### Code Quality ```bash # Format and lint uv run ruff check . --fix uv run ruff format . uv run mypy . # Run tests (when available) uv run pytest ``` ### Testing with MCP Inspector ```bash npx @modelcontextprotocol/inspector uv run mcp-server-mas-sequential-thinking ``` Open http://127.0.0.1:6274/ and test the `sequentialthinking` tool. ## System Characteristics ### Strengths: - **Multi-perspective analysis**: 6 different cognitive approaches - **AI-powered routing**: Intelligent complexity analysis - **Research capabilities**: 4 agents with web search (optional) - **Flexible processing**: Single to full sequence strategies - **Model optimization**: Enhanced/Standard model selection - **Provider agnostic**: Works with multiple LLM providers ### Considerations: - **Token usage**: Multi-agent processing uses more tokens than single-agent - **Processing time**: Complex sequences take longer but provide deeper insights - **API costs**: Research capabilities require separate Exa API subscription - **Model selection**: Enhanced models cost more but provide better synthesis ## Project Structure ``` mcp-server-mas-sequential-thinking/ ├── src/mcp_server_mas_sequential_thinking/ │ ├── main.py # MCP server entry point │ ├── processors/ │ │ ├── multi_thinking_core.py # 6 thinking agents definition │ │ └── multi_thinking_processor.py # Sequential processing logic │ ├── routing/ │ │ ├── ai_complexity_analyzer.py # AI-powered analysis │ │ └── multi_thinking_router.py # Intelligent routing │ ├── services/ │ │ ├── thought_processor_refactored.py │ │ ├── workflow_executor.py │ │ └── context_builder.py │ └── config/ │ ├── modernized_config.py # Provider strategies │ └── constants.py # System constants ├── pyproject.toml # Project configuration └── README.md # This file ``` ## Changelog See [CHANGELOG.md](CHANGELOG.md) for version history. ## Contributing Contributions are welcome! Please ensure: 1. Code follows project style (ruff, mypy) 2. Commit messages use conventional commits format 3. All tests pass before submitting PR 4. Documentation is updated as needed ## License This project is licensed under the MIT License - see the LICENSE file for details. ## Acknowledgments - Built with [Agno](https://github.com/agno-agi/agno) v2.0+ framework - Model Context Protocol by [Anthropic](https://www.anthropic.com/) - Research capabilities powered by [Exa](https://exa.ai/) (optional) - Multi-dimensional thinking inspired by Edward de Bono's work ## Support - GitHub Issues: [Report bugs or request features](https://github.com/FradSer/mcp-server-mas-sequential-thinking/issues) - Documentation: Check CLAUDE.md for detailed implementation notes - MCP Protocol: [Official MCP Documentation](https://modelcontextprotocol.io/) --- **Note**: This is an MCP server, designed to work with MCP-compatible clients like Claude Desktop. It is not a standalone chat application. ``` -------------------------------------------------------------------------------- /CLAUDE.md: -------------------------------------------------------------------------------- ```markdown # CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Essential Commands ```bash # Setup & Installation uv pip install -e ".[dev]" # Install all dependencies uv run python -c "import agno; print('Agno imported successfully')" # Verify setup # Development Workflow uv run mcp-server-mas-sequential-thinking # Run server uv run ruff check . --fix && uv run ruff format . && uv run mypy . # Code quality uv run pytest --cov=. --cov-report=html # Test with coverage (no tests currently) # Monitoring & Debugging tail -f ~/.sequential_thinking/logs/sequential_thinking.log # Live logs grep "ERROR\|WARNING" ~/.sequential_thinking/logs/sequential_thinking.log # Error search ``` ### Additional Commands - **Upgrade agno**: `uv pip install --upgrade agno` - **Alternative server runs**: `uvx mcp-server-mas-sequential-thinking` or `uv run python src/mcp_server_mas_sequential_thinking/main.py` - **MCP Inspector**: `npx @modelcontextprotocol/inspector uv run python src/mcp_server_mas_sequential_thinking/main.py` ## Project Overview **Pure Multi-Thinking Implementation** built with **Agno v2.0** framework and served via MCP. Features **AI-powered intelligent routing** with streamlined architecture (src layout, Python 3.10+). The system processes thoughts through multi-directional thinking methodology with AI-driven complexity analysis and optimized model selection. ### Core Architecture **Entry Point:** `src/mcp_server_mas_sequential_thinking/main.py` - FastMCP application with `sequentialthinking` tool - Uses refactored service-based architecture with dependency injection - Global state management via `ServerState` and `ThoughtProcessor` **Multi-Thinking Processing Flow:** ``` External LLM → sequentialthinking tool → ThoughtProcessor → WorkflowExecutor → MultiThinkingWorkflowRouter → MultiThinkingSequentialProcessor → Individual Thinking Agents → Synthesis ``` **Core Services (Dependency Injection):** - **ThoughtProcessor**: Main orchestrator using specialized services - **WorkflowExecutor**: Manages Multi-Thinking workflow execution - **ContextBuilder**: Builds context-aware prompts - **ResponseFormatter**: Formats final responses - **SessionMemory**: Tracks thought history and branching **AI-Powered Routing System:** - **MultiThinkingIntelligentRouter**: AI-driven complexity analysis determines thinking sequence - **AIComplexityAnalyzer**: Uses LLM to assess thought complexity, problem type, and required thinking modes - **MultiThinkingSequentialProcessor**: Executes chosen sequence with model optimization - **Thinking Complexity levels**: SINGLE, DOUBLE, TRIPLE, FULL sequences - **Model Intelligence**: Enhanced model for Blue Hat synthesis, Standard model for individual hats ### Configuration & Data Flow **Environment Variables:** - `LLM_PROVIDER`: Provider selection (deepseek, groq, openrouter, ollama, github, anthropic) - `{PROVIDER}_API_KEY`: API keys (e.g., `DEEPSEEK_API_KEY`, `GITHUB_TOKEN`) - `{PROVIDER}_ENHANCED_MODEL_ID`: Enhanced model for complex synthesis (Blue Hat) - `{PROVIDER}_STANDARD_MODEL_ID`: Standard model for individual hat processing - `EXA_API_KEY`: Research capabilities (if using research agents) **AI-Driven Model Strategy:** - **Enhanced Models**: Used for Blue Hat (metacognitive) thinking - complex synthesis, integration - **Standard Models**: Used for individual hat processing (White, Red, Black, Yellow, Green) - **Intelligent Selection**: System automatically chooses appropriate model based on hat type and AI-assessed complexity - **AI Analysis**: Replaces rule-based pattern matching with semantic understanding **Processing Strategies (AI-Determined):** 1. **Single Hat**: Simple focused thinking (White Hat facts, Red Hat emotions, etc.) 2. **Double Hat**: Two-step sequences (e.g., Optimistic→Critical for idea evaluation) 3. **Triple Hat**: Core philosophical thinking (Factual→Creative→Synthesis) 4. **Full Sequence**: Complete Multi-Thinking methodology with Blue Hat orchestration ### Streamlined Module Architecture **Core Framework:** - `core/session.py`: SessionMemory for thought history (simplified, no Team dependency) - `core/models.py`: ThoughtData validation and core data structures - `core/types.py`: Type definitions and protocols - `config/modernized_config.py`: Provider strategies with Enhanced/Standard model configuration - `config/constants.py`: All system constants and configuration values **Multi-Thinking Implementation:** - `processors/multi_thinking_processor.py`: Main Multi-Thinking sequential processor - `processors/multi_thinking_core.py`: Hat definitions, agent factory, core logic - `routing/multi_thinking_router.py`: AI-powered intelligent routing based on thought complexity - `routing/ai_complexity_analyzer.py`: AI-driven complexity and problem type analysis - `routing/agno_workflow_router.py`: Agno Workflow integration layer - `routing/complexity_types.py`: Core complexity analysis types and enums **Service Layer:** - `services/thought_processor_refactored.py`: Main thought processor with dependency injection - `services/workflow_executor.py`: Multi-Thinking workflow execution - `services/context_builder.py`: Context-aware prompt building - `services/response_formatter.py`: Response formatting and extraction - `services/server_core.py`: Server lifecycle and state management **Infrastructure:** - `infrastructure/logging_config.py`: Structured logging with rotation - `infrastructure/persistent_memory.py`: Memory persistence capabilities - `utils/utils.py`: Logging utilities and helper functions ### Architecture Characteristics - **Clean Architecture**: Dependency injection, separation of concerns, service-based design - **AI-Driven Intelligence**: Pure AI-based complexity analysis replacing rule-based systems - **Multi-Thinking Focus**: Streamlined implementation without legacy multi-agent complexity - **Model Optimization**: Smart model selection (Enhanced for synthesis, Standard for processing) - **Modern Python**: Dataclasses, type hints, async/await, pattern matching - **Environment-based config**: No config files, all via environment variables - **Structured logging**: Rotation to `~/.sequential_thinking/logs/` ## Enhanced/Standard Model Configuration **Naming Convention:** - `{PROVIDER}_ENHANCED_MODEL_ID`: For complex synthesis tasks (Blue Hat thinking) - `{PROVIDER}_STANDARD_MODEL_ID`: For individual hat processing **Examples:** ```bash # GitHub Models GITHUB_ENHANCED_MODEL_ID="openai/gpt-5" # Blue Hat synthesis GITHUB_STANDARD_MODEL_ID="openai/gpt-5-min" # Individual hats # DeepSeek DEEPSEEK_ENHANCED_MODEL_ID="deepseek-chat" # Both synthesis and processing DEEPSEEK_STANDARD_MODEL_ID="deepseek-chat" # Anthropic ANTHROPIC_ENHANCED_MODEL_ID="claude-3-5-sonnet-20241022" # Synthesis ANTHROPIC_STANDARD_MODEL_ID="claude-3-5-haiku-20241022" # Processing ``` **Usage Strategy:** - **Enhanced Model**: Blue Hat (metacognitive orchestrator) uses enhanced model for final synthesis - **Standard Model**: Individual hats (White, Red, Black, Yellow, Green) use standard model - **AI-Driven Selection**: System intelligently chooses model based on hat type and AI-assessed complexity ## Agno v2.0 Integration **Framework Features:** - **Workflow Integration**: Uses Agno Workflow system for Multi-Thinking processing - **Agent Factory**: Creates specialized hat agents with ReasoningTools - **Performance**: ~10,000x faster agent creation, ~50x less memory vs LangGraph - **Version**: Requires `agno>=2.0.5` **Key Integration Points:** - `MultiThinkingWorkflowRouter`: Bridges MCP and Agno Workflow systems - `MultiThinkingAgentFactory`: Creates individual hat agents using Agno v2.0 - **StepOutput**: Workflow results converted to Agno StepOutput format **For Agno Documentation**: Use deepwiki MCP reference with repoName: `agno-agi/agno` ## AI-Powered Complexity Analysis **Key Innovation**: The system uses AI instead of rule-based pattern matching for complexity analysis: - **AIComplexityAnalyzer**: Uses LLM to assess thought complexity, semantic depth, and problem characteristics - **Problem Type Detection**: AI identifies primary problem type (FACTUAL, EMOTIONAL, CREATIVE, PHILOSOPHICAL, etc.) - **Thinking Modes Recommendation**: AI suggests required thinking modes for optimal processing - **Semantic Understanding**: Replaces keyword matching with contextual analysis across languages **Benefits over Rule-Based Systems:** - Better handling of nuanced, philosophical, or cross-cultural content - Adaptive to new problem types without code changes - Semantic understanding vs simple pattern matching - Reduced maintenance overhead (no keyword lists to maintain) ## Development Notes **No Test Suite**: The project currently has no test files - all tests were removed during recent cleanup. **Recent Architecture Changes**: - Removed legacy multi-agent systems (agents/, optimization/, analysis/ modules) - Consolidated configuration (removed processing_constants.py redundancy) - Streamlined to 8 core modules focused on AI-driven Multi-Thinking **Code Quality**: Uses ruff for linting/formatting, mypy for type checking. Run `uv run ruff check . --fix && uv run ruff format . && uv run mypy .` before committing. ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/utils/__init__.py: -------------------------------------------------------------------------------- ```python """Utilities module for MCP Sequential Thinking Server. This module contains utility functions and helper classes. """ from .utils import setup_logging __all__ = [ # From utils "setup_logging", ] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/utils/utils.py: -------------------------------------------------------------------------------- ```python """Application utilities - completely rewritten for new architecture.""" from mcp_server_mas_sequential_thinking.infrastructure.logging_config import ( get_logger, setup_logging, ) # Re-export for convenience - no backward compatibility __all__ = ["get_logger", "setup_logging"] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/infrastructure/__init__.py: -------------------------------------------------------------------------------- ```python """Infrastructure module for MCP Sequential Thinking Server. This module contains infrastructure concerns including logging configuration and persistent memory. """ from .logging_config import LogTimer, MetricsLogger, get_logger, setup_logging from .persistent_memory import ( PersistentMemoryManager, create_persistent_memory, ) __all__ = [ "LogTimer", "MetricsLogger", # From persistent_memory "PersistentMemoryManager", "create_persistent_memory", # From logging_config "get_logger", "setup_logging", ] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/core/__init__.py: -------------------------------------------------------------------------------- ```python """Core domain module for MCP Sequential Thinking Server. This module contains the core domain logic including data models, types, and session management functionality. """ from .models import ThoughtData from .session import SessionMemory from .types import ( ConfigurationError, CoordinationPlan, ExecutionMode, ProcessingMetadata, TeamCreationError, ThoughtProcessingError, ) __all__ = [ "ConfigurationError", "CoordinationPlan", "ExecutionMode", "ProcessingMetadata", # From session "SessionMemory", "TeamCreationError", # From models "ThoughtData", # From types "ThoughtProcessingError", ] ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile FROM python:3.10-alpine # Set environment variables to prevent Python from writing .pyc files ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 # Install build dependencies RUN apk add --no-cache gcc musl-dev libffi-dev openssl-dev # Set working directory WORKDIR /app # Copy project files COPY pyproject.toml ./ COPY main.py ./ COPY README.md ./ # Upgrade pip and install hatchling build tool RUN pip install --upgrade pip && \ pip install hatchling # Build and install the project RUN pip install . # Command to run the MCP server CMD ["mcp-server-mas-sequential-thinking"] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/processors/__init__.py: -------------------------------------------------------------------------------- ```python """Processors module for MCP Sequential Thinking Server. This module contains processing logic including multi-thinking core functionality and multi-thinking processing implementation. """ from .multi_thinking_core import ( MultiThinkingAgentFactory, ThinkingDirection, create_thinking_agent, get_all_thinking_directions, get_thinking_timing, ) from .multi_thinking_processor import ( MultiThinkingProcessingResult, MultiThinkingSequentialProcessor, create_multi_thinking_step_output, ) __all__ = [ "MultiThinkingAgentFactory", # From multi_thinking_processor "MultiThinkingProcessingResult", "MultiThinkingSequentialProcessor", # From multi_thinking_core "ThinkingDirection", "create_multi_thinking_step_output", "create_thinking_agent", "get_all_thinking_directions", "get_thinking_timing", ] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/config/__init__.py: -------------------------------------------------------------------------------- ```python """Configuration module for MCP Sequential Thinking Server. This module contains all configuration-related functionality including constants, processing constants, and modernized configuration management. """ from .constants import ( ComplexityThresholds, DefaultTimeouts, DefaultValues, FieldLengthLimits, LoggingLimits, PerformanceMetrics, ProcessingDefaults, QualityThresholds, SecurityConstants, ValidationLimits, ) from .modernized_config import check_required_api_keys, get_model_config __all__ = [ # From constants "ComplexityThresholds", "DefaultTimeouts", "DefaultValues", "FieldLengthLimits", "LoggingLimits", "PerformanceMetrics", "ProcessingDefaults", "QualityThresholds", "SecurityConstants", "ValidationLimits", # From modernized_config "check_required_api_keys", "get_model_config", ] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/routing/__init__.py: -------------------------------------------------------------------------------- ```python """Routing module for MCP Sequential Thinking Server. This module contains routing and workflow logic including adaptive routing, workflow routing, optimization, and multi-thinking routing functionality. """ from .agno_workflow_router import ( MultiThinkingWorkflowResult, MultiThinkingWorkflowRouter, ) from .ai_complexity_analyzer import AIComplexityAnalyzer from .complexity_types import ComplexityLevel, ProcessingStrategy from .multi_thinking_router import ( MultiThinkingIntelligentRouter, create_multi_thinking_router, ) __all__ = [ # From ai_complexity_analyzer "AIComplexityAnalyzer", # From complexity_types "ComplexityLevel", # From multi_thinking_router "MultiThinkingIntelligentRouter", # From agno_workflow_router "MultiThinkingWorkflowResult", "MultiThinkingWorkflowRouter", "ProcessingStrategy", "create_multi_thinking_router", ] ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/__init__.py: -------------------------------------------------------------------------------- ```python """Services module for MCP Sequential Thinking Server. This module contains business logic services including server core, response processing, retry handling, and specialized processing services. """ from .context_builder import ContextBuilder from .processing_orchestrator import ProcessingOrchestrator from .response_formatter import ResponseExtractor, ResponseFormatter from .response_processor import ResponseProcessor from .retry_handler import TeamProcessingRetryHandler from .server_core import ( ServerConfig, ServerState, ThoughtProcessor, create_server_lifespan, create_validated_thought_data, ) from .workflow_executor import WorkflowExecutor __all__ = [ # From context_builder "ContextBuilder", # From processing_orchestrator "ProcessingOrchestrator", # From response_formatter "ResponseExtractor", "ResponseFormatter", # From response_processor "ResponseProcessor", # From server_core "ServerConfig", "ServerState", # From retry_handler "TeamProcessingRetryHandler", "ThoughtProcessor", # From workflow_executor "WorkflowExecutor", "create_server_lifespan", "create_validated_thought_data", ] ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object required: - llmProvider - deepseekApiKey properties: llmProvider: type: string default: deepseek description: "LLM Provider. Options: deepseek, groq, openrouter." deepseekApiKey: type: string description: API key for DeepSeek. Required if llmProvider is 'deepseek'. groqApiKey: type: string default: "" description: API key for Groq. Required if llmProvider is 'groq'. openrouterApiKey: type: string default: "" description: API key for OpenRouter. Required if llmProvider is 'openrouter'. exaApiKey: type: string default: "" description: API key for Exa to enable web research capabilities for thinking agents. commandFunction: # A JS function that produces the CLI command based on the given config to start the MCP on stdio. |- (config) => ({ command: 'mcp-server-mas-sequential-thinking', args: [], env: { LLM_PROVIDER: config.llmProvider, DEEPSEEK_API_KEY: config.deepseekApiKey, GROQ_API_KEY: config.groqApiKey, OPENROUTER_API_KEY: config.openrouterApiKey, EXA_API_KEY: config.exaApiKey } }) exampleConfig: llmProvider: deepseek deepseekApiKey: your_deepseek_api_key groqApiKey: "" openrouterApiKey: "" exaApiKey: your_exa_api_key ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/__init__.py: -------------------------------------------------------------------------------- ```python """MCP Sequential Thinking Server with AI-Powered Routing. A sophisticated Multi-Agent System (MAS) for sequential thinking with intelligent AI-based routing using advanced multi-thinking methodology. AI ROUTING FEATURES: - AI-powered complexity analysis replacing rule-based scoring - Intelligent thinking sequence selection (single, double, triple, full) - Semantic understanding of philosophical and technical depth - Automatic synthesis integration for complex thought processing CORE ARCHITECTURE: - AI-first routing with semantic complexity understanding - Multi-thinking methodology with intelligent orchestration - Unified agent factory eliminating code duplication - Separated server concerns for better maintainability - Optimized performance with async processing Key Features: - **AI Routing**: Semantic complexity analysis and strategy selection - **Multi-Thinking Integration**: Factual, emotional, critical, optimistic, creative, and synthesis processing - **Multi-Provider Support**: DeepSeek, Groq, OpenRouter, GitHub, Ollama - **Philosophical Understanding**: Deep analysis for existential questions Usage: # Server usage uv run mcp-server-mas-sequential-thinking # Direct processing from mcp_server_mas_sequential_thinking.services.server_core import ThoughtProcessor from mcp_server_mas_sequential_thinking.core.session import SessionMemory processor = ThoughtProcessor(session_memory) result = await processor.process_thought(thought_data) AI Routing Strategies: - Single Direction: Simple factual questions → quick focused processing - Double/Triple Direction: Moderate complexity → focused thinking sequences - Full Multi-Thinking: Complex philosophical questions → comprehensive analysis Configuration: Environment variables: - LLM_PROVIDER: Primary provider (deepseek, groq, openrouter, github, ollama) - {PROVIDER}_API_KEY: API keys for providers - {PROVIDER}_{TEAM|AGENT}_MODEL_ID: Model selection - AI_CONFIDENCE_THRESHOLD: Routing confidence threshold (default: 0.7) Performance Benefits: - Intelligent complexity assessment using AI understanding - Automated synthesis processing for coherent responses - Semantic depth recognition for philosophical questions - Efficient thinking sequence selection based on content analysis """ __version__ = "0.6.0" def get_version() -> str: """Get package version.""" return __version__ ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/routing/complexity_types.py: -------------------------------------------------------------------------------- ```python """Core complexity analysis types and enums. This file contains the essential types needed for complexity analysis, extracted from the old adaptive_routing.py to support AI-first architecture. """ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: from mcp_server_mas_sequential_thinking.core.models import ThoughtData class ProcessingStrategy(Enum): """Processing strategy types.""" SINGLE_AGENT = "single_agent" MULTI_AGENT = "multi_agent" HYBRID = "hybrid" class ComplexityLevel(Enum): """Complexity levels for thought processing.""" SIMPLE = "simple" MODERATE = "moderate" COMPLEX = "complex" HIGHLY_COMPLEX = "highly_complex" @dataclass class ComplexityMetrics: """Metrics for thought complexity analysis.""" # Core metrics (used by both AI and fallback analyzers) complexity_score: float # Primary score (0-100) # Detailed breakdown (optional, for debugging/analysis) word_count: int = 0 sentence_count: int = 0 question_count: int = 0 technical_terms: int = 0 branching_references: int = 0 research_indicators: int = 0 analysis_depth: int = 0 philosophical_depth_boost: int = 0 # AI Analysis Results (critical for routing decisions) primary_problem_type: str = "GENERAL" # AI-determined problem type thinking_modes_needed: list[str] | None = None # AI-recommended thinking modes # Analysis metadata analyzer_type: str = "unknown" # "ai" or "basic" reasoning: str = "" # Why this score was assigned def __post_init__(self): if self.thinking_modes_needed is None: self.thinking_modes_needed = ["SYNTHESIS"] @dataclass class RoutingDecision: """Decision result from routing analysis.""" strategy: ProcessingStrategy complexity_level: ComplexityLevel complexity_score: float reasoning: str estimated_token_usage: tuple[int, int] # (min, max) estimated_cost: float specialist_recommendations: list[str] | None = None def __post_init__(self): if self.specialist_recommendations is None: self.specialist_recommendations = [] class ComplexityAnalyzer(ABC): """Abstract base class for complexity analysis.""" @abstractmethod async def analyze(self, thought_data: "ThoughtData") -> ComplexityMetrics: """Analyze thought complexity and return metrics.""" ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/core/session.py: -------------------------------------------------------------------------------- ```python """Session management for thought history and branching.""" import asyncio from dataclasses import dataclass, field from mcp_server_mas_sequential_thinking.config.constants import ValidationLimits from .models import ThoughtData from .types import BranchId, ThoughtNumber @dataclass class SessionMemory: """Manages thought history and branches with optimized lookups and DoS protection. Thread-safe implementation with async locks to prevent race conditions. """ thought_history: list[ThoughtData] = field(default_factory=list) branches: dict[BranchId, list[ThoughtData]] = field(default_factory=dict) # High-performance cache for O(1) thought lookups by number _thought_cache: dict[ThoughtNumber, ThoughtData] = field( default_factory=dict, init=False, repr=False ) # Thread safety lock for concurrent access protection _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False, repr=False) # DoS protection constants as class attributes MAX_THOUGHTS_PER_SESSION: int = ValidationLimits.MAX_THOUGHTS_PER_SESSION MAX_BRANCHES_PER_SESSION: int = ValidationLimits.MAX_BRANCHES_PER_SESSION MAX_THOUGHTS_PER_BRANCH: int = ValidationLimits.MAX_THOUGHTS_PER_BRANCH async def add_thought(self, thought: ThoughtData) -> None: """Add thought with efficient DoS protection and optimized branch management. Thread-safe implementation with async lock to prevent race conditions. """ async with self._lock: # Early DoS protection checks with descriptive errors self._validate_session_limits(thought) # Update data structures atomically under lock self.thought_history.append(thought) self._thought_cache[thought.thoughtNumber] = thought # Handle branching with optimized setdefault pattern if thought.branchFromThought is not None and thought.branchId is not None: self.branches.setdefault(thought.branchId, []).append(thought) def _validate_session_limits(self, thought: ThoughtData) -> None: """Validate session limits with early exit optimization.""" # Primary session limit check if len(self.thought_history) >= self.MAX_THOUGHTS_PER_SESSION: raise ValueError( f"Session exceeds maximum {self.MAX_THOUGHTS_PER_SESSION} thoughts" ) # Branch-specific validations only if needed if not thought.branchId: return # Check total branch limit if len(self.branches) >= self.MAX_BRANCHES_PER_SESSION: raise ValueError( f"Session exceeds maximum {self.MAX_BRANCHES_PER_SESSION} branches" ) # Check individual branch limit if ( thought.branchId in self.branches and len(self.branches[thought.branchId]) >= self.MAX_THOUGHTS_PER_BRANCH ): raise ValueError( f"Branch '{thought.branchId}' exceeds maximum " f"{self.MAX_THOUGHTS_PER_BRANCH} thoughts" ) async def find_thought_content(self, thought_number: ThoughtNumber) -> str: """Find the content of a specific thought by number using optimized cache lookup.""" async with self._lock: # Use cache for O(1) lookup instead of O(n) search thought = self._thought_cache.get(thought_number) return thought.thought if thought else "Unknown thought" async def get_branch_summary(self) -> dict[BranchId, int]: """Get summary of all branches.""" async with self._lock: return { branch_id: len(thoughts) for branch_id, thoughts in self.branches.items() } def get_current_branch_id(self, thought: ThoughtData) -> str: """Get the current branch ID for a thought with improved logic.""" return thought.branchId or "main" ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-server-mas-sequential-thinking" version = "0.7.0" description = "MCP Agent Implementation for Sequential Thinking" readme = "README.md" requires-python = ">=3.10" authors = [ { name = "Frad LEE", email = "[email protected]" }, { name = "Alain Ivars", email = "[email protected]" }, ] dependencies = [ "agno>=2.0.5", "asyncio", "exa-py", "python-dotenv", "mcp", "groq", "ollama", "openrouter", "httpx[socks]>=0.28.1", "sqlalchemy", ] [project.optional-dependencies] dev = [ "pytest", "black", "isort", "mypy", "ruff", ] [project.scripts] mcp-server-mas-sequential-thinking = "mcp_server_mas_sequential_thinking.main:run" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/mcp_server_mas_sequential_thinking"] [dependency-groups] dev = [ "pytest>=8.4.1", ] [tool.ruff] target-version = "py310" line-length = 88 src = ["src", "tests"] [tool.ruff.lint] select = [ "E", # pycodestyle errors "W", # pycodestyle warnings "F", # pyflakes "I", # isort "N", # pep8-naming "D", # pydocstyle "UP", # pyupgrade "YTT", # flake8-2020 "ANN", # flake8-annotations "ASYNC", # flake8-async "S", # flake8-bandit "BLE", # flake8-blind-except "FBT", # flake8-boolean-trap "B", # flake8-bugbear "A", # flake8-builtins "COM", # flake8-commas "C4", # flake8-comprehensions "DTZ", # flake8-datetimez "T10", # flake8-debugger "DJ", # flake8-django "EM", # flake8-errmsg "EXE", # flake8-executable "FA", # flake8-future-annotations "ISC", # flake8-implicit-str-concat "ICN", # flake8-import-conventions "G", # flake8-logging-format "INP", # flake8-no-pep420 "PIE", # flake8-pie "T20", # flake8-print "PYI", # flake8-pyi "PT", # flake8-pytest-style "Q", # flake8-quotes "RSE", # flake8-raise "RET", # flake8-return "SLF", # flake8-self "SLOT", # flake8-slots "SIM", # flake8-simplify "TID", # flake8-tidy-imports "TCH", # flake8-type-checking "INT", # flake8-gettext "ARG", # flake8-unused-arguments "PTH", # flake8-use-pathlib "ERA", # eradicate "PD", # pandas-vet "PGH", # pygrep-hooks "PL", # pylint "TRY", # tryceratops "FLY", # flynt "NPY", # numpy "AIR", # airflow "PERF", # perflint "FURB", # refurb "LOG", # flake8-logging "RUF", # ruff-specific rules ] ignore = [ "D100", # Missing docstring in public module "D101", # Missing docstring in public class "D102", # Missing docstring in public method "D103", # Missing docstring in public function "D104", # Missing docstring in public package "D105", # Missing docstring in magic method "D107", # Missing docstring in __init__ "COM812", # Trailing comma missing "ISC001", # Implicitly concatenated string literals on one line "FBT001", # Boolean positional arg in function definition "FBT002", # Boolean default positional argument in function definition "S101", # Use of assert detected "PLR0913", # Too many arguments to function call "PLR2004", # Magic value used in comparison "TRY003", # Avoid specifying long messages outside the exception class "EM101", # Exception must not use a string literal, assign to variable first "EM102", # Exception must not use an f-string literal, assign to variable first ] [tool.ruff.lint.per-file-ignores] "tests/*" = ["S101", "PLR2004", "ANN", "D"] "__init__.py" = ["F401"] [tool.ruff.lint.pydocstyle] convention = "google" [tool.ruff.lint.isort] known-first-party = ["mcp_server_mas_sequential_thinking"] split-on-trailing-comma = true [tool.ruff.format] quote-style = "double" indent-style = "space" skip-magic-trailing-comma = false line-ending = "auto" ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/core/models.py: -------------------------------------------------------------------------------- ```python """Streamlined models with consolidated validation logic.""" from enum import Enum from typing import Any from pydantic import BaseModel, Field, model_validator from mcp_server_mas_sequential_thinking.config.constants import ( FieldLengthLimits, ValidationLimits, ) from .types import BranchId, ThoughtNumber class ThoughtType(Enum): """Types of thoughts in the sequential thinking process.""" STANDARD = "standard" REVISION = "revision" BRANCH = "branch" def _validate_thought_relationships(data: dict) -> None: """Validate thought relationships with optimized validation logic.""" # Extract values once with modern dict methods data.get("isRevision", False) branch_from_thought = data.get("branchFromThought") branch_id = data.get("branchId") current_number = data.get("thoughtNumber") # Collect validation errors efficiently errors = [] # Relationship validation with guard clauses if branch_id is not None and branch_from_thought is None: errors.append("branchId requires branchFromThought to be set") # Numeric validation with early exit if current_number is None: if errors: raise ValueError("; ".join(errors)) return # Validate numeric relationships if branch_from_thought is not None and branch_from_thought >= current_number: errors.append("branchFromThought must be less than current thoughtNumber") if errors: raise ValueError("; ".join(errors)) class ThoughtData(BaseModel): """Streamlined thought data model with consolidated validation.""" model_config = {"validate_assignment": True, "frozen": True} # Core fields thought: str = Field( ..., min_length=FieldLengthLimits.MIN_STRING_LENGTH, description="Content of the thought", ) # MCP API compatibility - camelCase field names required thoughtNumber: ThoughtNumber = Field( # noqa: N815 ..., ge=ValidationLimits.MIN_THOUGHT_NUMBER, description="Sequence number starting from 1", ) totalThoughts: int = Field( # noqa: N815 ..., ge=1, description="Estimated total thoughts", ) nextThoughtNeeded: bool = Field( # noqa: N815 ..., description="Whether another thought is needed" ) # Required workflow fields isRevision: bool = Field( # noqa: N815 ..., description="Whether this revises a previous thought" ) branchFromThought: ThoughtNumber | None = Field( # noqa: N815 ..., ge=ValidationLimits.MIN_THOUGHT_NUMBER, description="Thought number to branch from", ) branchId: BranchId | None = Field( # noqa: N815 ..., description="Unique branch identifier" ) needsMoreThoughts: bool = Field( # noqa: N815 ..., description="Whether more thoughts are needed beyond estimate" ) @property def thought_type(self) -> ThoughtType: """Determine the type of thought based on field values.""" if self.isRevision: return ThoughtType.REVISION if self.branchFromThought is not None: return ThoughtType.BRANCH return ThoughtType.STANDARD @model_validator(mode="before") @classmethod def validate_thought_data(cls, data: dict[str, Any]) -> dict[str, Any]: """Consolidated validation with simplified logic.""" if isinstance(data, dict): _validate_thought_relationships(data) return data def format_for_log(self) -> str: """Format thought for logging with optimized type-specific formatting.""" # Use match statement for modern Python pattern matching match self.thought_type: case ThoughtType.REVISION: prefix = ( f"Revision {self.thoughtNumber}/{self.totalThoughts} " f"(revising #{self.branchFromThought})" ) case ThoughtType.BRANCH: prefix = ( f"Branch {self.thoughtNumber}/{self.totalThoughts} " f"(from #{self.branchFromThought}, ID: {self.branchId})" ) case _: # ThoughtType.STANDARD prefix = f"Thought {self.thoughtNumber}/{self.totalThoughts}" # Use multiline string formatting for better readability return ( f"{prefix}\n" f" Content: {self.thought}\n" f" Next: {self.nextThoughtNeeded}, More: {self.needsMoreThoughts}" ) ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/context_builder.py: -------------------------------------------------------------------------------- ```python """Context building service for thought processing. This service handles building context-aware prompts from thought data, managing session history, and constructing appropriate inputs for processing. """ from mcp_server_mas_sequential_thinking.core import SessionMemory, ThoughtData from mcp_server_mas_sequential_thinking.utils import setup_logging logger = setup_logging() class ContextBuilder: """Service responsible for building context-aware prompts and managing thought context.""" def __init__(self, session: SessionMemory) -> None: """Initialize the context builder with session memory. Args: session: The session memory instance for accessing thought history """ self._session = session async def build_context_prompt(self, thought_data: ThoughtData) -> str: """Build context-aware input prompt with optimized string construction. This method creates contextual prompts based on thought type: - Revision thoughts include original content - Branch thoughts include origin content - Sequential thoughts use basic format Args: thought_data: The thought data to build context for Returns: Formatted prompt string with appropriate context """ # Pre-calculate base components for efficiency base = f"Process Thought #{thought_data.thoughtNumber}:\n" content = f'\nThought Content: "{thought_data.thought}"' # Add context using pattern matching with optimized string building match thought_data: case ThoughtData(isRevision=True, branchFromThought=revision_num) if ( revision_num ): original = await self._find_thought_content_safe(revision_num) context = f'**REVISION of Thought #{revision_num}** (Original: "{original}")\n' return f"{base}{context}{content}" case ThoughtData(branchFromThought=branch_from, branchId=branch_id) if ( branch_from and branch_id ): origin = await self._find_thought_content_safe(branch_from) context = f'**BRANCH (ID: {branch_id}) from Thought #{branch_from}** (Origin: "{origin}")\n' return f"{base}{context}{content}" case _: return f"{base}{content}" async def _find_thought_content_safe(self, thought_number: int) -> str: """Safely find thought content with error handling. Args: thought_number: The thought number to find Returns: The thought content or a placeholder if not found """ try: return await self._session.find_thought_content(thought_number) except Exception: return "[not found]" async def log_context_building( self, thought_data: ThoughtData, input_prompt: str ) -> None: """Log context building details for debugging and monitoring. Args: thought_data: The thought data being processed input_prompt: The built prompt """ logger.info("📝 CONTEXT BUILDING:") if thought_data.isRevision and thought_data.branchFromThought: logger.info( " Type: Revision of thought #%s", thought_data.branchFromThought ) original = await self._find_thought_content_safe( thought_data.branchFromThought ) logger.info(" Original thought: %s", original) elif thought_data.branchFromThought and thought_data.branchId: logger.info( " Type: Branch '%s' from thought #%s", thought_data.branchId, thought_data.branchFromThought, ) origin = await self._find_thought_content_safe( thought_data.branchFromThought ) logger.info(" Branch origin: %s", origin) else: logger.info(" Type: Sequential thought #%s", thought_data.thoughtNumber) logger.info(" Session thoughts: %d total", len(self._session.thought_history)) logger.info(" Input thought: %s", thought_data.thought) logger.info(" Built prompt length: %d chars", len(input_prompt)) logger.info(" Built prompt:\n%s", input_prompt) # Use field length limits constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import FieldLengthLimits separator_length = FieldLengthLimits.SEPARATOR_LENGTH except ImportError: pass logger.info(" %s", "=" * separator_length) def create_simplified_prompt(self, input_prompt: str) -> str: """Create a simplified prompt for single-agent processing. Args: input_prompt: The original input prompt Returns: Simplified prompt optimized for single-agent processing """ return f"""Process this thought efficiently: {input_prompt} Provide a focused response with clear guidance for the next step.""" ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/retry_handler.py: -------------------------------------------------------------------------------- ```python """Retry handling utilities for robust processing.""" import asyncio import time from collections.abc import Callable from dataclasses import dataclass from typing import Any, TypeVar from mcp_server_mas_sequential_thinking.config.constants import ( DefaultTimeouts, PerformanceMetrics, ) from mcp_server_mas_sequential_thinking.core.types import ThoughtProcessingError from mcp_server_mas_sequential_thinking.infrastructure.logging_config import get_logger logger = get_logger(__name__) T = TypeVar("T") @dataclass class RetryConfig: """Configuration for retry operations.""" max_attempts: int = DefaultTimeouts.MAX_RETRY_ATTEMPTS sleep_duration: float = PerformanceMetrics.RETRY_SLEEP_DURATION exponential_base: float = DefaultTimeouts.RETRY_EXPONENTIAL_BASE use_exponential_backoff: bool = False class RetryHandler: """Handles retry logic with configurable strategies.""" def __init__(self, config: RetryConfig | None = None) -> None: """Initialize retry handler with configuration.""" self.config = config or RetryConfig() async def execute_with_retry( self, operation: Callable[[], Any], operation_name: str, context_info: dict | None = None, ) -> Any: """Execute operation with retry logic.""" last_exception = None max_retries = self.config.max_attempts for retry_count in range(max_retries + 1): try: self._log_attempt( retry_count, max_retries, operation_name, context_info ) start_time = time.time() result = await operation() processing_time = time.time() - start_time self._log_success(operation_name, processing_time) return result except Exception as e: last_exception = e self._log_error(retry_count, max_retries, operation_name, e) if retry_count < max_retries: await self._wait_before_retry(retry_count) else: self._log_exhaustion(max_retries, operation_name) raise ThoughtProcessingError( f"{operation_name} failed after {max_retries + 1} attempts: {e}" ) from e # Should never reach here, but provide safety raise ThoughtProcessingError( f"Unexpected error in retry logic for {operation_name}" ) from last_exception def _log_attempt( self, retry_count: int, max_retries: int, operation_name: str, context_info: dict | None, ) -> None: """Log retry attempt information.""" logger.info( f"Processing attempt {retry_count + 1}/{max_retries + 1}: {operation_name}" ) if context_info: for key, value in context_info.items(): logger.info(f" {key}: {value}") def _log_success(self, operation_name: str, processing_time: float) -> None: """Log successful operation completion.""" logger.info(f"✅ {operation_name} completed in {processing_time:.3f}s") def _log_error( self, retry_count: int, max_retries: int, operation_name: str, error: Exception ) -> None: """Log error information.""" logger.error(f"{operation_name} error on attempt {retry_count + 1}: {error}") if retry_count < max_retries: logger.info(f"Retrying... ({retry_count + 1}/{max_retries})") def _log_exhaustion(self, max_retries: int, operation_name: str) -> None: """Log retry exhaustion.""" logger.error(f"All retry attempts exhausted for {operation_name}") async def _wait_before_retry(self, retry_count: int) -> None: """Wait before retry with optional exponential backoff.""" if self.config.use_exponential_backoff: wait_time = self.config.sleep_duration * ( self.config.exponential_base**retry_count ) else: wait_time = self.config.sleep_duration await asyncio.sleep(wait_time) class TeamProcessingRetryHandler(RetryHandler): """Specialized retry handler for team processing operations.""" def __init__(self) -> None: """Initialize with team processing specific configuration.""" config = RetryConfig( max_attempts=DefaultTimeouts.MAX_RETRY_ATTEMPTS, sleep_duration=PerformanceMetrics.RETRY_SLEEP_DURATION, use_exponential_backoff=True, ) super().__init__(config) async def execute_team_processing( self, team_operation: Callable[[], Any], team_info: dict, complexity_level: str ) -> Any: """Execute team processing with specialized retry logic.""" context_info = { "complexity": complexity_level, "team": team_info.get("name", "unknown"), "agents": team_info.get("member_count", 0), "leader": team_info.get("leader_model", "unknown"), } return await self.execute_with_retry( team_operation, "MULTI-AGENT TEAM PROCESSING", context_info ) ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/response_processor.py: -------------------------------------------------------------------------------- ```python """Response processing utilities for consistent response handling.""" from dataclasses import dataclass from typing import Any from mcp_server_mas_sequential_thinking.infrastructure.logging_config import get_logger logger = get_logger(__name__) @dataclass class ProcessedResponse: """Standardized response data structure.""" content: str raw_response: Any processing_time: float | None = None metadata: dict | None = None class ResponseExtractor: """Handles extraction of content from various response types.""" @staticmethod def extract_content(response: Any) -> str: """Extract string content from various response formats.""" if response is None: return "" # Handle string responses directly if isinstance(response, str): return response # Handle RunOutput from Agno framework if hasattr(response, "content"): content = response.content if isinstance(content, str): return content if isinstance(content, dict): # Extract from dict-based content return ResponseExtractor._extract_from_dict(content) if hasattr(content, "__str__"): return str(content) # Handle dictionary responses if isinstance(response, dict): return ResponseExtractor._extract_from_dict(response) # Handle objects with text/message attributes for attr in ["text", "message", "result", "output"]: if hasattr(response, attr): value = getattr(response, attr) if isinstance(value, str): return value # Fallback to string conversion logger.warning(f"Unknown response type {type(response)}, converting to string") return str(response) @staticmethod def _extract_from_dict(content_dict: dict) -> str: """Extract content from dictionary-based responses.""" # Common content keys in order of preference content_keys = ["content", "text", "message", "result", "output", "response"] for key in content_keys: if key in content_dict: value = content_dict[key] if isinstance(value, str): return value if isinstance(value, dict) and "result" in value: # Handle nested result structures return str(value["result"]) # If no standard key found, try to get first string value for value in content_dict.values(): if isinstance(value, str) and value.strip(): return value # Fallback to string representation return str(content_dict) class ResponseProcessor: """Comprehensive response processing with logging and validation.""" def __init__(self) -> None: """Initialize response processor.""" self.extractor = ResponseExtractor() def process_response( self, response: Any, processing_time: float | None = None, context: str = "processing", ) -> ProcessedResponse: """Process response with extraction, validation, and logging.""" content = self.extractor.extract_content(response) # Validate content if not content or not content.strip(): logger.warning(f"Empty response content in {context}") content = f"[Empty response from {context}]" # Create metadata metadata = { "context": context, "response_type": type(response).__name__, "content_length": len(content), "has_content": bool(content.strip()), } processed = ProcessedResponse( content=content, raw_response=response, processing_time=processing_time, metadata=metadata, ) self._log_response_details(processed, context) return processed def _log_response_details(self, processed: ProcessedResponse, context: str) -> None: """Log detailed response information.""" logger.info(f"📝 {context.upper()} RESPONSE:") if processed.metadata: logger.info(f" Type: {processed.metadata.get('response_type', 'unknown')}") logger.info( f" Length: {processed.metadata.get('content_length', 0)} chars" ) else: logger.info(" Type: unknown") logger.info(f" Length: {len(processed.content)} chars") if processed.processing_time: logger.info(f" Processing time: {processed.processing_time:.3f}s") # Log content preview (first 100 chars) content_preview = processed.content[:100] if len(processed.content) > 100: content_preview += "..." logger.info(f" Preview: {content_preview}") class ResponseFormatter: """Formats responses for consistent output.""" @staticmethod def format_for_client(processed: ProcessedResponse) -> str: """Format processed response for client consumption.""" content = processed.content.strip() # Ensure content ends with appropriate punctuation if content and not content.endswith((".", "!", "?", ":", ";")): content += "." return content @staticmethod def format_with_metadata(processed: ProcessedResponse) -> dict: """Format response with metadata for debugging.""" return { "content": processed.content, "metadata": processed.metadata, "processing_time": processed.processing_time, } ``` -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown # Changelog All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] ## [0.7.0] - 2025-09-24 ### Added - Parallel execution for thinking agents to improve processing performance - Comprehensive Mermaid diagrams in documentation showing parallel processing flows - Detailed agent descriptions in README files with multi-dimensional thinking methodology - Comparison table with original TypeScript version highlighting architectural differences ### Changed - **PERFORMANCE**: Converted non-synthesis agents to run in parallel using asyncio.gather for significant speed improvements - **GROQ PROVIDER**: Updated Groq provider to use OpenAI GPT-OSS models (openai/gpt-oss-120b for enhanced, openai/gpt-oss-20b for standard) - Complete restructure of README files with cleaner formatting and better organization - Improved documentation clarity by removing all emoji characters from codebase and documentation ### Fixed - Resolved MetricsLogger import error that was preventing server startup - Fixed missing MetricsLogger class implementation in logging configuration - Corrected Mermaid diagram syntax errors in README files - Removed references to non-existent PerformanceTracker class ## [0.5.0] - 2025-09-17 ### Added - Comprehensive TDD test coverage for refactoring and quality improvement - Default settings and processing strategy enum for enhanced configuration - Adaptive architecture with cost optimization capabilities - Comprehensive test infrastructure and unit tests - Magic number extraction to constants for better maintainability ### Changed - **BREAKING**: Migration to Agno v2.0 with architectural updates (~10,000x faster agent creation, ~50x less memory usage) - Upgraded Agno to version 2.0.5 with enhanced agent features - Reorganized types module and cleaned duplicates for better structure - Modernized codebase with enhanced type safety and annotations - Adopted src layout for Python project structure following best practices - Optimized code structure and performance across modules ### Fixed - Resolved mypy type checking errors across all modules - Comprehensive security and quality improvements - Updated minimum Agno version to 2.0.5 for compatibility ### Documentation - Updated CLAUDE.md with Agno v2.0 migration details and corrected commands - Enhanced guidance for src layout and development requirements - Improved test documentation and GitHub provider information ## [0.4.1] - 2025-08-06 ### Fixed - app_lifespan function signature for FastMCP compatibility ### Changed - Restructured main.py with modular architecture for better maintainability ## [0.4.0] - 2025-08-06 ### Added - Support for Kimi K2 model via OpenRouter integration - Enhanced model provider options and configuration flexibility ### Changed - CHANGELOG.md following Keep a Changelog standards - Moved changelog from README.md to dedicated CHANGELOG.md file ## [0.3.0] - 2025-08-01 ### Added - Support for Ollama FULL LOCAL (no API key needed, but requires Ollama installed and running locally) - Local LLM inference capabilities through Ollama integration - Enhanced model configuration options for local deployment - MseeP.ai security assessment badge ### Changed - Restored DeepSeek as default LLM provider - Improved package naming and configuration - Updated dependencies to support local inference - Enhanced agent memory management (disabled for individual agents) ### Fixed - Package naming issues in configuration - Dependency conflicts resolved - Merge conflicts between branches ## [0.2.3] - 2025-04-22 ### Changed - Updated version alignment in project configuration and lock file ## [0.2.2] - 2025-04-10 ### Changed - Default agent model ID for DeepSeek changed from `deepseek-reasoner` to `deepseek-chat` - Improved model selection recommendations ## [0.2.1] - 2025-04-10 ### Changed - Model selection recommendations updated in documentation - Enhanced guidance for coordinator vs specialist model selection ## [0.2.0] - 2025-04-06 ### Added - Major refactoring of sequential thinking team structure - Enhanced coordination logic - Improved JSON output format - LLM configuration and model selection enhancements ### Changed - Agent model IDs updated for better performance - Project structure improvements ## [0.1.3] - 2025-04-06 ### Changed - Project entry point script from `main:main` to `main:run` - Updated documentation for improved user guidance - Cleaned up dependencies in lock file ## [0.1.0] - 2025-04-06 ### Added - Initial project structure and configuration files - Multi-Agent System (MAS) architecture using Agno framework - Sequential thinking tool with coordinated specialist agents - Support for multiple LLM providers (DeepSeek, Groq, OpenRouter) - Pydantic validation for robust data integrity - Integration with external tools (Exa for research) - Structured logging with file and console output - Support for thought revisions and branching - MCP server implementation with FastMCP - Distributed intelligence across specialized agents [Unreleased]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.7.0...HEAD [0.7.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.5.0...v0.7.0 [0.5.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.4.1...v0.5.0 [0.4.1]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.4.0...v0.4.1 [0.4.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.3.0...v0.4.0 [0.3.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.3...v0.3.0 [0.2.3]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.2...v0.2.3 [0.2.2]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.1...v0.2.2 [0.2.1]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.0...v0.2.1 [0.2.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.1.3...v0.2.0 [0.1.3]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.1.0...v0.1.3 [0.1.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/releases/tag/v0.1.0 ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/workflow_executor.py: -------------------------------------------------------------------------------- ```python """Workflow execution service for multi-thinking processing. This service handles the execution of multi-thinking workflows, managing workflow routing and coordination of processing strategies. """ import time from typing import TYPE_CHECKING, Any from mcp_server_mas_sequential_thinking.config.constants import PerformanceMetrics from mcp_server_mas_sequential_thinking.core import SessionMemory, ThoughtData from mcp_server_mas_sequential_thinking.utils import setup_logging if TYPE_CHECKING: from mcp_server_mas_sequential_thinking.routing import MultiThinkingWorkflowResult logger = setup_logging() class WorkflowExecutor: """Service responsible for executing multi-thinking workflows.""" def __init__(self, session: SessionMemory) -> None: """Initialize the workflow executor with session memory. Args: session: The session memory instance for accessing team and context """ self._session = session self._agno_router: Any = None self._initialize_multi_thinking_workflow() def _initialize_multi_thinking_workflow(self) -> None: """Initialize multi-thinking workflow router. Uses dynamic import to avoid circular dependency issues. """ logger.info("Initializing Multi-Thinking Workflow Router") # Dynamic import to avoid circular dependency from mcp_server_mas_sequential_thinking.routing import ( # noqa: PLC0415 MultiThinkingWorkflowRouter, ) self._agno_router = MultiThinkingWorkflowRouter() logger.info("✅ Multi-Thinking Workflow Router ready") async def execute_workflow( self, thought_data: ThoughtData, input_prompt: str, start_time: float ) -> tuple[str, "MultiThinkingWorkflowResult", float]: """Execute multi-thinking workflow for the given thought. Args: thought_data: The thought data to process input_prompt: The context-aware input prompt start_time: Processing start time for metrics Returns: Tuple of (final_response, workflow_result, total_time) """ # Execute multi-thinking workflow workflow_result: MultiThinkingWorkflowResult = ( await self._agno_router.process_thought_workflow(thought_data, input_prompt) ) total_time = time.time() - start_time return workflow_result.content, workflow_result, total_time def log_workflow_completion( self, thought_data: ThoughtData, workflow_result: "MultiThinkingWorkflowResult", total_time: float, final_response: str, ) -> None: """Log workflow completion with multi-thinking specific metrics. Args: thought_data: The processed thought data workflow_result: The workflow execution result total_time: Total processing time final_response: The final formatted response """ # Basic completion info completion_metrics = { f"Thought #{thought_data.thoughtNumber}": "processed successfully", "Strategy": workflow_result.strategy_used, "Complexity Score": f"{workflow_result.complexity_score:.1f}/100", "Step": workflow_result.step_name, "Processing time": f"{workflow_result.processing_time:.3f}s", "Total time": f"{total_time:.3f}s", "Response length": f"{len(final_response)} chars", } self._log_metrics_block( "🧠 MULTI-THINKING WORKFLOW COMPLETION:", completion_metrics ) self._log_separator() # Performance metrics execution_consistency = self._calculate_execution_consistency( workflow_result.strategy_used != "error_fallback" ) efficiency_score = self._calculate_efficiency_score( workflow_result.processing_time ) performance_metrics = { "Execution Consistency": execution_consistency, "Efficiency Score": efficiency_score, "Response Length": f"{len(final_response)} chars", "Strategy Executed": workflow_result.strategy_used, } self._log_metrics_block("📊 WORKFLOW PERFORMANCE METRICS:", performance_metrics) def _log_metrics_block(self, title: str, metrics: dict[str, Any]) -> None: """Log a formatted metrics block. Args: title: The title for the metrics block metrics: Dictionary of metrics to log """ logger.info("%s", title) for key, value in metrics.items(): if isinstance(value, float): logger.info(" %s: %.2f", key, value) else: logger.info(" %s: %s", key, value) def _log_separator(self, length: int = 60) -> None: """Log a separator line. Args: length: Length of the separator line """ # Use performance metrics constant length = PerformanceMetrics.SEPARATOR_LENGTH logger.info(" %s", "=" * length) def _calculate_efficiency_score(self, processing_time: float) -> float: """Calculate efficiency score using standard metrics. Args: processing_time: The processing time in seconds Returns: Efficiency score between 0 and 1 """ # Use constants from PerformanceMetrics perfect_score = PerformanceMetrics.PERFECT_EFFICIENCY_SCORE threshold = PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD minimum_score = PerformanceMetrics.MINIMUM_EFFICIENCY_SCORE return ( perfect_score if processing_time < threshold else max(minimum_score, threshold / processing_time) ) def _calculate_execution_consistency(self, success_indicator: bool) -> float: """Calculate execution consistency using standard metrics. Args: success_indicator: Whether execution was successful Returns: Execution consistency score """ # Use constants from PerformanceMetrics perfect_consistency = PerformanceMetrics.PERFECT_EXECUTION_CONSISTENCY default_consistency = PerformanceMetrics.DEFAULT_EXECUTION_CONSISTENCY return perfect_consistency if success_indicator else default_consistency ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/infrastructure/logging_config.py: -------------------------------------------------------------------------------- ```python """Streamlined logging configuration based on Python best practices. Replaces complex 985-line implementation with focused, performance-optimized approach. """ import logging import logging.handlers import os import sys from pathlib import Path def setup_logging(level: str | None = None) -> logging.Logger: """Setup streamlined logging with environment-based configuration. Args: level: Log level override. If None, uses LOG_LEVEL env var or defaults to INFO. Returns: Configured logger instance for the application. """ # Determine log level from environment or parameter log_level = level or os.getenv("LOG_LEVEL", "INFO") try: numeric_level = getattr(logging, log_level.upper()) except AttributeError: numeric_level = logging.INFO # Create logs directory log_dir = Path.home() / ".sequential_thinking" / "logs" log_dir.mkdir(parents=True, exist_ok=True) # Configure root logger for this application logger = logging.getLogger("sequential_thinking") logger.setLevel(numeric_level) # Clear any existing handlers to avoid duplicates logger.handlers.clear() # Console handler for development/debugging if os.getenv("ENVIRONMENT") != "production": console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(numeric_level) console_formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # File handler with rotation for persistent logging log_file = log_dir / "sequential_thinking.log" file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=5 * 1024 * 1024, # 5MB backupCount=3, encoding="utf-8", ) file_handler.setLevel(numeric_level) file_formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s" ) file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) # Prevent propagation to root logger to avoid duplicates logger.propagate = False return logger def get_logger(name: str | None = None) -> logging.Logger: """Get a logger instance with consistent configuration. Args: name: Logger name. If None, uses calling module's name. Returns: Logger instance. """ if name is None: # Get caller's module name for better traceability import inspect frame = inspect.currentframe().f_back name = frame.f_globals.get("__name__", "sequential_thinking") return logging.getLogger(name) def log_performance_metric( logger: logging.Logger, operation: str, duration: float, **kwargs ) -> None: """Log performance metrics in consistent format. Uses lazy evaluation to avoid string formatting overhead. """ if logger.isEnabledFor(logging.INFO): extras = ", ".join(f"{k}={v}" for k, v in kwargs.items()) if kwargs else "" logger.info( "Performance: %s completed in %.2fs%s", operation, duration, f" ({extras})" if extras else "", ) def log_routing_decision( logger: logging.Logger, strategy: str, complexity: float, reasoning: str = "" ) -> None: """Log AI routing decisions with consistent structure.""" logger.info( "AI Routing: strategy=%s, complexity=%.1f%s", strategy, complexity, f", reason={reasoning}" if reasoning else "", ) def log_thought_processing( logger: logging.Logger, stage: str, thought_number: int, thought_length: int = 0, **context, ) -> None: """Log thought processing stages with structured data.""" if logger.isEnabledFor(logging.INFO): ctx_str = ", ".join(f"{k}={v}" for k, v in context.items()) if context else "" logger.info( "Thought Processing: stage=%s, number=%d, length=%d%s", stage, thought_number, thought_length, f", {ctx_str}" if ctx_str else "", ) class LogTimer: """Context manager for timing operations with automatic logging.""" def __init__( self, logger: logging.Logger, operation: str, level: int = logging.INFO ) -> None: self.logger = logger self.operation = operation self.level = level self.start_time = None def __enter__(self): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Starting: %s", self.operation) import time self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): import time duration = time.time() - self.start_time if exc_type is None: if self.logger.isEnabledFor(self.level): self.logger.log( self.level, "Completed: %s (%.2fs)", self.operation, duration ) else: self.logger.error( "Failed: %s (%.2fs) - %s", self.operation, duration, exc_val ) # Legacy compatibility - maintain existing function names but with simplified implementation def create_logger(name: str) -> logging.Logger: """Legacy compatibility function.""" return get_logger(name) def configure_logging(level: str = "INFO") -> logging.Logger: """Legacy compatibility function.""" return setup_logging(level) class MetricsLogger: """Simple metrics logger for structured logging output.""" def __init__(self, logger_name: str = "sequential_thinking") -> None: """Initialize metrics logger with specified logger name.""" self.logger = logging.getLogger(logger_name) def log_metrics_block(self, title: str, metrics: dict) -> None: """Log a block of metrics with a title. Args: title: Block title to display metrics: Dictionary of metrics to log """ if not self.logger.isEnabledFor(logging.INFO): return self.logger.info("%s", title) for key, value in metrics.items(): self.logger.info(" %s: %s", key, value) def log_separator(self, length: int = 60) -> None: """Log a separator line. Args: length: Length of the separator line """ if self.logger.isEnabledFor(logging.INFO): self.logger.info("-" * length) ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/response_formatter.py: -------------------------------------------------------------------------------- ```python """Response formatting service for thought processing. This service handles formatting and synthesizing responses from various processing modes, extracting content from Agno RunOutput objects, and preparing final responses. """ from mcp_server_mas_sequential_thinking.core import ThoughtData from mcp_server_mas_sequential_thinking.utils import setup_logging logger = setup_logging() class ResponseFormatter: """Service responsible for formatting and synthesizing responses.""" def __init__(self) -> None: """Initialize the response formatter.""" def format_response(self, content: str, thought_data: ThoughtData) -> str: """Format response for MCP - clean content without additional guidance. MCP servers should return clean content and let the AI decide next steps. Args: content: The raw response content to format thought_data: The thought data context Returns: Formatted response string """ # MCP servers should return clean content, let AI decide next steps final_response = content # Log response formatting details self._log_response_formatting(content, thought_data, final_response) return final_response def extract_response_content(self, response) -> str: """Extract clean content from Agno RunOutput objects. Handles various response types and extracts text content properly. Args: response: The response object from Agno processing Returns: Extracted text content """ # Import ResponseExtractor to handle the extraction from mcp_server_mas_sequential_thinking.services.response_processor import ( ResponseExtractor, ) return ResponseExtractor.extract_content(response) def _log_response_formatting( self, content: str, thought_data: ThoughtData, final_response: str ) -> None: """Log response formatting details for debugging and monitoring. Args: content: The original response content thought_data: The thought data context final_response: The final formatted response """ logger.info("📤 RESPONSE FORMATTING:") logger.info(f" Original content length: {len(content)} chars") logger.info(f" Next needed: {thought_data.nextThoughtNeeded}") logger.info(f" Final response length: {len(final_response)} chars") logger.info(f" Final response:\n{final_response}") # Use field length limits constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import FieldLengthLimits separator_length = FieldLengthLimits.SEPARATOR_LENGTH except ImportError: pass logger.info(f" {'=' * separator_length}") def log_input_details( self, input_prompt: str, context_description: str = "input" ) -> None: """Log input details with consistent formatting. Args: input_prompt: The input prompt to log context_description: Description of the context """ logger.info(f" Input length: {len(input_prompt)} chars") logger.info(f" Full {context_description}:\\n{input_prompt}") # Use performance metrics constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import PerformanceMetrics separator_length = PerformanceMetrics.SEPARATOR_LENGTH except ImportError: pass logger.info(f" {'=' * separator_length}") def log_output_details( self, response_content: str, processing_time: float, context_description: str = "response", ) -> None: """Log output details with consistent formatting. Args: response_content: The response content to log processing_time: Processing time in seconds context_description: Description of the context """ logger.info(f" Processing time: {processing_time:.3f}s") logger.info(f" Output length: {len(response_content)} chars") logger.info(f" Full {context_description}:\\n{response_content}") # Use performance metrics constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import PerformanceMetrics separator_length = PerformanceMetrics.SEPARATOR_LENGTH except ImportError: pass logger.info(f" {'=' * separator_length}") class ResponseExtractor: """Utility class for extracting content from various response types.""" @staticmethod def extract_content(response) -> str: """Extract clean content from Agno RunOutput objects. This method handles various response formats and extracts the actual text content that should be returned to the user. Args: response: The response object from agent processing Returns: Extracted text content as string """ # Handle string responses directly if isinstance(response, str): return response.strip() # Handle Agno RunOutput objects if hasattr(response, "content"): content = response.content if isinstance(content, str): return content.strip() if isinstance(content, list): # Handle list of content items text_parts = [] for item in content: if isinstance(item, str): text_parts.append(item) elif hasattr(item, "text"): text_parts.append(item.text) elif hasattr(item, "content"): text_parts.append(str(item.content)) return "\n".join(text_parts).strip() # Handle objects with text attribute if hasattr(response, "text"): return response.text.strip() # Handle objects with message attribute if hasattr(response, "message"): message = response.message if isinstance(message, str): return message.strip() if hasattr(message, "content"): return str(message.content).strip() # Fallback: convert to string return str(response).strip() ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/thought_processor_refactored.py: -------------------------------------------------------------------------------- ```python """Refactored ThoughtProcessor using dependency injection and single responsibility services. This module provides a clean, maintainable implementation of thought processing that delegates responsibilities to specialized services following clean architecture. """ import time from mcp_server_mas_sequential_thinking.config import ProcessingDefaults from mcp_server_mas_sequential_thinking.core import ( ProcessingMetadata, SessionMemory, ThoughtData, ThoughtProcessingError, ) from mcp_server_mas_sequential_thinking.infrastructure import ( MetricsLogger, ) from mcp_server_mas_sequential_thinking.utils import setup_logging from .context_builder import ContextBuilder from .processing_orchestrator import ProcessingOrchestrator from .response_formatter import ResponseFormatter from .response_processor import ResponseProcessor from .retry_handler import TeamProcessingRetryHandler from .workflow_executor import WorkflowExecutor logger = setup_logging() class ThoughtProcessor: """Refactored thought processor using dependency injection and clean architecture. This class orchestrates thought processing by delegating specific responsibilities to specialized services, maintaining a clean separation of concerns. """ __slots__ = ( "_context_builder", "_metrics_logger", "_performance_tracker", "_processing_orchestrator", "_response_formatter", "_session", "_workflow_executor", ) def __init__(self, session: SessionMemory) -> None: """Initialize the thought processor with dependency injection. Args: session: The session memory instance for accessing team and context """ self._session = session # Initialize core services with dependency injection self._context_builder = ContextBuilder(session) self._workflow_executor = WorkflowExecutor(session) self._response_formatter = ResponseFormatter() # Initialize supporting services response_processor = ResponseProcessor() retry_handler = TeamProcessingRetryHandler() self._processing_orchestrator = ProcessingOrchestrator( session, response_processor, retry_handler ) # Initialize monitoring services self._metrics_logger = MetricsLogger() logger.info("ThoughtProcessor initialized with specialized services") async def process_thought(self, thought_data: ThoughtData) -> str: """Process a thought through the appropriate workflow with comprehensive error handling. This is the main public API method that maintains backward compatibility while using the new service-based architecture internally. Args: thought_data: The thought data to process Returns: Processed thought response Raises: ThoughtProcessingError: If processing fails """ try: return await self._process_thought_internal(thought_data) except Exception as e: error_msg = f"Failed to process {thought_data.thought_type.value} thought #{thought_data.thoughtNumber}: {e}" logger.error(error_msg, exc_info=True) metadata: ProcessingMetadata = { "error_count": ProcessingDefaults.ERROR_COUNT_INITIAL, "retry_count": ProcessingDefaults.RETRY_COUNT_INITIAL, "processing_time": ProcessingDefaults.PROCESSING_TIME_INITIAL, } raise ThoughtProcessingError(error_msg, metadata) from e async def _process_thought_internal(self, thought_data: ThoughtData) -> str: """Internal thought processing logic using specialized services. Args: thought_data: The thought data to process Returns: Processed thought response """ start_time = time.time() # Log thought data and add to session (now async for thread safety) self._log_thought_data(thought_data) await self._session.add_thought(thought_data) # Build context using specialized service (now async for thread safety) input_prompt = await self._context_builder.build_context_prompt(thought_data) await self._context_builder.log_context_building(thought_data, input_prompt) # Execute Multi-Thinking workflow using specialized service ( content, workflow_result, total_time, ) = await self._workflow_executor.execute_workflow( thought_data, input_prompt, start_time ) # Format response using specialized service final_response = self._response_formatter.format_response(content, thought_data) # Log workflow completion self._workflow_executor.log_workflow_completion( thought_data, workflow_result, total_time, final_response ) return final_response def _log_thought_data(self, thought_data: ThoughtData) -> None: """Log comprehensive thought data information using centralized logger. Args: thought_data: The thought data to log """ basic_info = { f"Thought #{thought_data.thoughtNumber}": f"{thought_data.thoughtNumber}/{thought_data.totalThoughts}", "Type": thought_data.thought_type.value, "Content": thought_data.thought, "Next needed": thought_data.nextThoughtNeeded, "Needs more": thought_data.needsMoreThoughts, } # Add conditional fields if thought_data.isRevision: basic_info["Is revision"] = ( f"True (revises thought #{thought_data.branchFromThought})" ) if thought_data.branchFromThought: basic_info["Branch from"] = ( f"#{thought_data.branchFromThought} (ID: {thought_data.branchId})" ) basic_info["Raw data"] = thought_data.format_for_log() self._metrics_logger.log_metrics_block("🧩 THOUGHT DATA:", basic_info) # Use field length limits constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import FieldLengthLimits separator_length = FieldLengthLimits.SEPARATOR_LENGTH except ImportError: pass self._metrics_logger.log_separator(separator_length) # Legacy methods for backward compatibility - these delegate to orchestrator async def _execute_single_agent_processing( self, input_prompt: str, routing_decision=None ) -> str: """Legacy method - delegates to orchestrator for backward compatibility.""" return await self._processing_orchestrator.execute_single_agent_processing( input_prompt, simplified=True ) async def _execute_team_processing(self, input_prompt: str) -> str: """Legacy method - delegates to orchestrator for backward compatibility.""" return await self._processing_orchestrator.execute_team_processing(input_prompt) def _extract_response_content(self, response) -> str: """Legacy method - delegates to formatter for backward compatibility.""" return self._response_formatter.extract_response_content(response) def _build_context_prompt(self, thought_data: ThoughtData) -> str: """Legacy method - delegates to context builder for backward compatibility.""" return self._context_builder.build_context_prompt(thought_data) def _format_response(self, content: str, thought_data: ThoughtData) -> str: """Legacy method - delegates to formatter for backward compatibility.""" return self._response_formatter.format_response(content, thought_data) ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/core/types.py: -------------------------------------------------------------------------------- ```python """Type definitions for better type safety.""" from __future__ import annotations from dataclasses import dataclass from enum import Enum from typing import TYPE_CHECKING, Any, Protocol, TypedDict if TYPE_CHECKING: from agno.agent import Agent from agno.models.base import Model from agno.team.team import Team from mcp_server_mas_sequential_thinking.config.modernized_config import ModelConfig from mcp_server_mas_sequential_thinking.core.models import ThoughtData # Type aliases for better semantic meaning ThoughtNumber = int BranchId = str ProviderName = str TeamType = str AgentType = str ConfigDict = dict[str, Any] InstructionsList = list[str] SuccessCriteriaList = list[str] class ExecutionMode(Enum): """Execution modes for different routing strategies.""" SINGLE_AGENT = "single_agent" SELECTIVE_TEAM = "selective_team" # Hybrid with specific specialists FULL_TEAM = "full_team" # Complete multi-agent team @dataclass class CoordinationPlan: """Comprehensive plan combining routing and coordination decisions.""" # Routing decisions - using string values for simplicity strategy: str # ProcessingStrategy value complexity_level: str # ComplexityLevel value complexity_score: float # Coordination decisions execution_mode: ExecutionMode specialist_roles: list[str] task_breakdown: list[str] coordination_strategy: str # Execution parameters timeout_seconds: float expected_interactions: int team_size: int # Reasoning and metadata reasoning: str confidence: float original_thought: str @classmethod def from_routing_decision( cls, routing_decision: dict[str, Any], thought_data: ThoughtData ) -> CoordinationPlan: """Create coordination plan from adaptive routing decision.""" # Map ProcessingStrategy to ExecutionMode strategy_to_mode = { "single_agent": ExecutionMode.SINGLE_AGENT, "hybrid": ExecutionMode.SELECTIVE_TEAM, "multi_agent": ExecutionMode.FULL_TEAM, } # Map complexity to specialist roles complexity_to_specialists: dict[str, list[str]] = { "simple": ["general"], "moderate": ["planner", "analyzer"], "complex": ["planner", "researcher", "analyzer", "critic"], "highly_complex": [ "planner", "researcher", "analyzer", "critic", "synthesizer", ], } execution_mode = strategy_to_mode.get( routing_decision.strategy.value, ExecutionMode.SINGLE_AGENT ) specialists = complexity_to_specialists.get( routing_decision.complexity_level.value, ["general"] ) return cls( strategy=routing_decision.strategy.value, complexity_level=routing_decision.complexity_level.value, complexity_score=routing_decision.complexity_score, execution_mode=execution_mode, specialist_roles=specialists, team_size=len(specialists), coordination_strategy="adaptive_routing_based", task_breakdown=[ f"Process {thought_data.thought_type.value} thought", "Generate guidance", ], expected_interactions=len(specialists), timeout_seconds=300.0, # Default timeout reasoning=routing_decision.reasoning, confidence=0.8, # Default confidence for rule-based routing original_thought=thought_data.thought, ) class ProcessingMetadata(TypedDict, total=False): """Type-safe processing metadata structure.""" strategy: str complexity_score: float estimated_cost: float actual_cost: float token_usage: int processing_time: float specialists: list[str] provider: str routing_reasoning: str error_count: int retry_count: int class SessionStats(TypedDict, total=False): """Type-safe session statistics structure.""" total_thoughts: int total_cost: float total_tokens: int average_processing_time: float error_rate: float successful_thoughts: int failed_thoughts: int class ComplexityMetrics(TypedDict): """Type-safe complexity analysis metrics.""" word_count: int sentence_count: int question_count: int technical_terms: int has_branching: bool has_research_keywords: bool has_analysis_keywords: bool overall_score: float class ModelProvider(Protocol): """Protocol for model provider implementations.""" id: str cost_per_token: float class AgentFactory(Protocol): """Protocol for agent factory implementations.""" def create_team_agents(self, model: Model, team_type: str) -> dict[str, Agent]: """Create team agents with specified model and team type.""" ... class TeamBuilder(Protocol): """Protocol for team builder implementations.""" def build_team(self, config: ConfigDict, agent_factory: AgentFactory) -> Team: """Build a team with specified configuration and agent factory.""" ... class CostEstimator(Protocol): """Protocol for cost estimation with type safety.""" def estimate_cost( self, strategy: str, complexity_score: float, provider: str ) -> tuple[tuple[int, int], float]: """Estimate cost for processing strategy.""" ... class ComplexityAnalyzer(Protocol): """Protocol for complexity analysis with type safety.""" def analyze(self, thought_text: str) -> ComplexityMetrics: """Analyze thought complexity and return metrics.""" ... class ThoughtProcessor(Protocol): """Protocol for thought processing with type safety.""" async def process_thought(self, thought_data: ThoughtData) -> str: """Process a thought and return the result.""" ... class SessionManager(Protocol): """Protocol for session management with type safety.""" def add_thought(self, thought_data: ThoughtData) -> None: """Add a thought to the session.""" ... def find_thought_content(self, thought_number: int) -> str: """Find thought content by number.""" ... def get_branch_summary(self) -> dict[str, int]: """Get summary of all branches.""" ... class ConfigurationProvider(Protocol): """Protocol for configuration management with type safety.""" def get_model_config(self, provider_name: str | None = None) -> ModelConfig: """Get model configuration.""" ... def check_required_api_keys(self, provider_name: str | None = None) -> list[str]: """Check for required API keys.""" ... # Custom Exception Classes class ValidationError(ValueError): """Exception raised when data validation fails.""" class ConfigurationError(Exception): """Exception raised when configuration is invalid.""" class ThoughtProcessingError(Exception): """Exception raised when thought processing fails.""" def __init__( self, message: str, metadata: ProcessingMetadata | None = None ) -> None: super().__init__(message) self.metadata: ProcessingMetadata = metadata or {} class TeamCreationError(Exception): """Exception raised when team creation fails.""" class RoutingDecisionError(ThoughtProcessingError): """Error in adaptive routing decision making.""" class CostOptimizationError(ThoughtProcessingError): """Error in cost optimization logic.""" class PersistentStorageError(ThoughtProcessingError): """Error in persistent memory storage.""" class ModelConfigurationError(ConfigurationError): """Error in model configuration.""" class ProviderError(Exception): """Error related to LLM providers.""" class AgentCreationError(Exception): """Error in agent creation.""" ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/main.py: -------------------------------------------------------------------------------- ```python """Refactored MCP Sequential Thinking Server with separated concerns and reduced complexity.""" import asyncio import sys from collections.abc import AsyncIterator from contextlib import asynccontextmanager from html import escape from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from pydantic import ValidationError from .config import ProcessingDefaults, SecurityConstants, ValidationLimits from .core import ThoughtProcessingError # Import refactored modules from .services import ( ServerConfig, ServerState, ThoughtProcessor, create_server_lifespan, create_validated_thought_data, ) from .utils import setup_logging # Initialize environment and logging load_dotenv() logger = setup_logging() # Global server state with thread safety _server_state: ServerState | None = None _thought_processor: ThoughtProcessor | None = None _processor_lock = asyncio.Lock() @asynccontextmanager async def app_lifespan(app) -> AsyncIterator[None]: """Simplified application lifespan using refactored server core.""" global _server_state, _thought_processor logger.info("Starting Sequential Thinking Server") async with create_server_lifespan() as server_state: _server_state = server_state logger.info("Server ready for requests") yield _server_state = None _thought_processor = None logger.info("Server stopped") # Initialize FastMCP with lifespan mcp = FastMCP(lifespan=app_lifespan) def sanitize_and_validate_input(text: str, max_length: int, field_name: str) -> str: """Sanitize and validate input with comprehensive security checks.""" # Early validation with guard clause if not text or not text.strip(): raise ValueError(f"{field_name} cannot be empty") # Strip and normalize whitespace first text = text.strip() # Check for potential prompt injection patterns using centralized constants text_lower = text.lower() for pattern in SecurityConstants.INJECTION_PATTERNS: if pattern in text_lower: raise ValueError( f"Potential security risk detected in {field_name}. " f"Input contains suspicious pattern: '{pattern}'" ) # Additional security checks if text.count('"') > 10 or text.count("'") > 10: raise ValueError( f"Excessive quotation marks detected in {field_name}. " "This may indicate an injection attempt." ) # Sanitize HTML entities and special characters sanitized_text = escape(text) # Length validation with descriptive error if len(sanitized_text) > max_length: raise ValueError( f"{field_name} exceeds maximum length of {max_length} characters" ) return sanitized_text @mcp.prompt("sequential-thinking") def sequential_thinking_prompt(problem: str, context: str = "") -> list[dict]: """Enhanced starter prompt for sequential thinking with better formatting.""" # Sanitize and validate inputs try: problem = sanitize_and_validate_input( problem, ValidationLimits.MAX_PROBLEM_LENGTH, "Problem statement" ) context = ( sanitize_and_validate_input( context, ValidationLimits.MAX_CONTEXT_LENGTH, "Context" ) if context else "" ) except ValueError as e: raise ValueError(f"Input validation failed: {e}") user_prompt = f"""Initiate sequential thinking for: {problem} {f"Context: {context}" if context else ""}""" assistant_guide = f"""Starting sequential thinking process for: {problem} Process Guidelines: 1. Estimate appropriate number of total thoughts based on problem complexity 2. Begin with: "Plan comprehensive analysis for: {problem}" 3. Use revisions (isRevision=True) to improve previous thoughts 4. Use branching (branchFromThought, branchId) for alternative approaches 5. Each thought should be detailed with clear reasoning 6. Progress systematically through analysis phases System Architecture: - Multi-Thinking methodology with intelligent routing - Factual, Emotional, Critical, Optimistic, Creative, and Synthesis perspectives - Adaptive thinking sequence based on thought complexity and type - Comprehensive integration through Synthesis thinking Ready to begin systematic analysis.""" return [ { "description": "Enhanced sequential thinking starter with comprehensive guidelines", "messages": [ {"role": "user", "content": {"type": "text", "text": user_prompt}}, { "role": "assistant", "content": {"type": "text", "text": assistant_guide}, }, ], } ] @mcp.tool() async def sequentialthinking( thought: str, thoughtNumber: int, totalThoughts: int, nextThoughtNeeded: bool, isRevision: bool, branchFromThought: int | None, branchId: str | None, needsMoreThoughts: bool, ) -> str: """Advanced sequential thinking tool with multi-agent coordination. Processes thoughts through a specialized team of AI agents that coordinate to provide comprehensive analysis, planning, research, critique, and synthesis. Args: thought: Content of the thinking step (required) thoughtNumber: Sequence number starting from {ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE} (≥{ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE}) totalThoughts: Estimated total thoughts required (≥1) nextThoughtNeeded: Whether another thought step follows this one isRevision: Whether this thought revises a previous thought branchFromThought: Thought number to branch from for alternative exploration branchId: Unique identifier for the branch (required if branchFromThought set) needsMoreThoughts: Whether more thoughts are needed beyond the initial estimate Returns: Synthesized response from the multi-agent team with guidance for next steps Raises: ProcessingError: When thought processing fails ValidationError: When input validation fails RuntimeError: When server state is invalid """ # Capture server state locally to avoid async race conditions current_server_state = _server_state if current_server_state is None: return "Server Error: Server not initialized" try: # Create and validate thought data using refactored function thought_data = create_validated_thought_data( thought=thought, thoughtNumber=thoughtNumber, totalThoughts=totalThoughts, nextThoughtNeeded=nextThoughtNeeded, isRevision=isRevision, branchFromThought=branchFromThought, branchId=branchId, needsMoreThoughts=needsMoreThoughts, ) # Use captured state directly to avoid race conditions global _thought_processor async with _processor_lock: if _thought_processor is None: logger.info( "Initializing ThoughtProcessor with Multi-Thinking workflow" ) _thought_processor = ThoughtProcessor(current_server_state.session) result = await _thought_processor.process_thought(thought_data) logger.info(f"Successfully processed thought #{thoughtNumber}") return result except ValidationError as e: error_msg = f"Input validation failed for thought #{thoughtNumber}: {e}" logger.exception(error_msg) return f"Validation Error: {e}" except ThoughtProcessingError as e: error_msg = f"Processing failed for thought #{thoughtNumber}: {e}" logger.exception(error_msg) if hasattr(e, "metadata") and e.metadata: logger.exception(f"Error metadata: {e.metadata}") return f"Processing Error: {e}" except Exception as e: error_msg = f"Unexpected error processing thought #{thoughtNumber}: {e}" logger.exception(error_msg) return f"Unexpected Error: {e}" def run() -> None: """Run the MCP server with enhanced error handling and graceful shutdown.""" config = ServerConfig.from_environment() logger.info(f"Starting Sequential Thinking Server with {config.provider} provider") try: # Run server with stdio transport mcp.run(transport="stdio") except KeyboardInterrupt: logger.info("Server stopped by user (SIGINT)") except SystemExit as e: logger.info(f"Server stopped with exit code: {e.code}") raise except Exception as e: logger.error(f"Critical server error: {e}", exc_info=True) sys.exit(ProcessingDefaults.EXIT_CODE_ERROR) finally: logger.info("Server shutdown sequence complete") def main() -> None: """Main entry point with proper error handling.""" try: run() except Exception as e: logger.critical(f"Fatal error in main: {e}", exc_info=True) sys.exit(ProcessingDefaults.EXIT_CODE_ERROR) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/config/constants.py: -------------------------------------------------------------------------------- ```python """Constants for the MCP Sequential Thinking Server.""" from enum import Enum from typing import ClassVar class TokenCosts: """Token cost constants for different providers (cost per 1000 tokens).""" DEEPSEEK_COST_PER_1K = 0.0002 GROQ_COST_PER_1K = 0.0001 OPENROUTER_COST_PER_1K = 0.001 GITHUB_COST_PER_1K = 0.0005 OLLAMA_COST_PER_1K = 0.0000 DEFAULT_COST_PER_1K = 0.0002 class ComplexityScoring: """Complexity analysis scoring constants.""" MAX_SCORE = 100.0 WORD_COUNT_MAX_POINTS = 15 WORD_COUNT_DIVISOR = 20 SENTENCE_MULTIPLIER = 2 SENTENCE_MAX_POINTS = 10 QUESTION_MULTIPLIER = 3 QUESTION_MAX_POINTS = 15 TECHNICAL_TERM_MULTIPLIER = 2 TECHNICAL_TERM_MAX_POINTS = 20 BRANCHING_MULTIPLIER = 5 BRANCHING_MAX_POINTS = 15 RESEARCH_MULTIPLIER = 3 RESEARCH_MAX_POINTS = 15 ANALYSIS_MULTIPLIER = 2 ANALYSIS_MAX_POINTS = 10 class TokenEstimates: """Token estimation ranges by complexity and strategy.""" # Single agent token estimates (min, max) SINGLE_AGENT_SIMPLE = (400, 800) SINGLE_AGENT_MODERATE = (600, 1200) SINGLE_AGENT_COMPLEX = (800, 1600) SINGLE_AGENT_HIGHLY_COMPLEX = (1000, 2000) # Multi-agent token estimates (min, max) MULTI_AGENT_SIMPLE = (2000, 4000) MULTI_AGENT_MODERATE = (3000, 6000) MULTI_AGENT_COMPLEX = (4000, 8000) MULTI_AGENT_HIGHLY_COMPLEX = (5000, 10000) class ValidationLimits: """Input validation and system limits.""" MAX_PROBLEM_LENGTH = 500 MAX_CONTEXT_LENGTH = 300 MAX_THOUGHTS_PER_SESSION = 1000 MAX_BRANCHES_PER_SESSION = 50 MAX_THOUGHTS_PER_BRANCH = 100 GITHUB_TOKEN_LENGTH = 40 MIN_THOUGHT_NUMBER = 1 class DefaultTimeouts: """Default timeout values in seconds.""" PROCESSING_TIMEOUT = 30.0 DEEPSEEK_PROCESSING_TIMEOUT = 120.0 # Legacy timeout for Deepseek (deprecated) MULTI_AGENT_TIMEOUT_MULTIPLIER = 2.0 # Multiply timeout for multi-agent # Adaptive timeout strategy (v0.5.1+) ADAPTIVE_BASE_DEEPSEEK = 90.0 # Base timeout for Deepseek with adaptive scaling ADAPTIVE_BASE_GROQ = 20.0 # Base timeout for Groq ADAPTIVE_BASE_OPENAI = 45.0 # Base timeout for OpenAI ADAPTIVE_BASE_DEFAULT = 30.0 # Default base timeout # Maximum timeouts (safety ceiling) MAX_TIMEOUT_DEEPSEEK = 300.0 # 5 minutes absolute maximum for Deepseek MAX_TIMEOUT_DEFAULT = 180.0 # 3 minutes maximum for others # Complexity multipliers for adaptive timeouts COMPLEXITY_SIMPLE_MULTIPLIER = 1.0 COMPLEXITY_MODERATE_MULTIPLIER = 1.5 COMPLEXITY_COMPLEX_MULTIPLIER = 2.0 COMPLEXITY_HIGHLY_COMPLEX_MULTIPLIER = 3.0 # Retry configuration RETRY_EXPONENTIAL_BASE = 1.5 # Exponential backoff base MAX_RETRY_ATTEMPTS = 2 # Maximum retry attempts SESSION_CLEANUP_DAYS = 30 RECENT_SESSION_KEEP_COUNT = 100 class LoggingLimits: """Logging configuration constants.""" LOG_FILE_MAX_BYTES = 5 * 1024 * 1024 # 5MB LOG_BACKUP_COUNT = 3 SENSITIVE_DATA_MIN_LENGTH = 8 class QualityThresholds: """Quality scoring and budget utilization thresholds.""" DEFAULT_QUALITY_THRESHOLD = 0.7 HIGH_BUDGET_UTILIZATION = 0.8 VERY_HIGH_BUDGET_UTILIZATION = 0.9 MULTI_AGENT_HIGH_USAGE = 0.7 SINGLE_AGENT_HIGH_USAGE = 0.8 MINIMUM_USAGE_FOR_SUGGESTIONS = 10 SIGNIFICANT_COST_THRESHOLD = 0.01 class ProviderDefaults: """Default provider configurations.""" DEFAULT_QUALITY_SCORE = 0.8 DEFAULT_RESPONSE_TIME = 2.0 DEFAULT_UPTIME_SCORE = 0.95 DEFAULT_ERROR_RATE = 0.05 DEFAULT_CONTEXT_LENGTH = 4096 class ComplexityThresholds: """Complexity level thresholds for scoring.""" SIMPLE_MAX = 25.0 MODERATE_MAX = 50.0 COMPLEX_MAX = 75.0 # HIGHLY_COMPLEX is anything above COMPLEX_MAX class DefaultValues: """Default configuration values.""" DEFAULT_LLM_PROVIDER = "deepseek" DEFAULT_TEAM_MODE = "standard" DEFAULT_LOG_LEVEL = "INFO" DEFAULT_MAX_RETRIES = 3 DEFAULT_TIMEOUT = 30.0 class PerformanceMetrics: """Performance measurement constants.""" # Efficiency calculation thresholds EFFICIENCY_TIME_THRESHOLD = 60.0 # seconds PERFECT_EXECUTION_CONSISTENCY = 1.0 DEFAULT_EXECUTION_CONSISTENCY = 0.9 PERFECT_EFFICIENCY_SCORE = 1.0 MINIMUM_EFFICIENCY_SCORE = 0.5 # Retry and sleep constants RETRY_SLEEP_DURATION = 1.0 # seconds # Logging formatting SEPARATOR_LENGTH = 50 class ProcessingDefaults: """Default values for thought processing.""" ERROR_COUNT_INITIAL = 1 RETRY_COUNT_INITIAL = 0 PROCESSING_TIME_INITIAL = 0.0 TEAM_INITIALIZER_INDEX = 1 SINGLE_AGENT_TIMEOUT_MULTIPLIER = 0.5 EXIT_CODE_ERROR = 1 class FieldLengthLimits: """Field length limits for various inputs.""" MIN_STRING_LENGTH = 1 MAX_STANDARD_STRING = 2000 MAX_DESCRIPTION_LENGTH = 1000 MAX_BRANCH_ID_LENGTH = 100 SEPARATOR_LENGTH = 50 class DatabaseConstants: """Database configuration constants.""" SESSION_CLEANUP_BATCH_SIZE = 100 THOUGHT_QUERY_LIMIT = 1000 CONNECTION_POOL_SIZE = 5 CONNECTION_POOL_OVERFLOW = 10 class ThoughtProcessingLimits: """Limits for thought processing workflow.""" MIN_THOUGHT_SEQUENCE = 1 MAX_TEAM_DELEGATION_COUNT = 2 ANALYSIS_TIME_LIMIT_SECONDS = 5 MIN_PROCESSING_STEPS = 1 MAX_PROCESSING_STEPS = 6 class TechnicalTerms: """Technical terms for complexity analysis.""" KEYWORDS: ClassVar[list[str]] = [ "algorithm", "data", "analysis", "system", "process", "design", "implementation", "architecture", "framework", "model", "structure", "optimization", "performance", "scalability", "integration", "api", "database", "security", "authentication", "authorization", "testing", "deployment", "configuration", "monitoring", "logging", "debugging", "refactoring", "migration", "synchronization", "caching", "protocol", "interface", "inheritance", "polymorphism", "abstraction", "encapsulation", ] class DefaultSettings: """Default application settings.""" DEFAULT_PROVIDER = "deepseek" DEFAULT_COMPLEXITY_THRESHOLD = 30.0 DEFAULT_TOKEN_BUFFER = 0.2 DEFAULT_SESSION_TIMEOUT = 3600 class CostOptimizationConstants: """Constants for cost optimization calculations.""" # Quality scoring weights QUALITY_WEIGHT = 0.4 COST_WEIGHT = 0.3 SPEED_WEIGHT = 0.2 RELIABILITY_WEIGHT = 0.1 # Cost calculation factors COST_NORMALIZATION_FACTOR = 0.0003 COST_EPSILON = 0.0001 # Prevent division by zero DEFAULT_COST_ESTIMATE = 0.0002 SPEED_NORMALIZATION_BASE = 10 SPEED_THRESHOLD = 1 # Quality scoring bounds MIN_QUALITY_SCORE = 0.0 MAX_QUALITY_SCORE = 1.0 # Budget utilization thresholds HIGH_BUDGET_UTILIZATION = 0.8 MODERATE_BUDGET_UTILIZATION = 0.7 CRITICAL_BUDGET_UTILIZATION = 0.9 # Complexity bonuses QUALITY_COMPLEXITY_BONUS = 0.2 COST_COMPLEXITY_BONUS = 0.0001 # Provider optimization HIGH_USAGE_PENALTY = 2.0 MODERATE_USAGE_PENALTY = 0.5 QUALITY_UPDATE_WEIGHT = 0.1 OLD_QUALITY_WEIGHT = 0.9 # Usage analysis thresholds MIN_DATA_THRESHOLD = 10 HIGH_MULTI_AGENT_RATIO = 0.7 HIGH_SINGLE_AGENT_RATIO = 0.8 MINIMUM_COST_DIFFERENCE = 0.01 # Provider-specific configurations GROQ_RATE_LIMIT = 14400 GROQ_CONTEXT_LENGTH = 32768 GROQ_QUALITY_SCORE = 0.75 GROQ_RESPONSE_TIME = 0.8 DEEPSEEK_QUALITY_SCORE = 0.85 DEEPSEEK_CONTEXT_LENGTH = 128000 GITHUB_CONTEXT_LENGTH = 128000 OPENROUTER_RESPONSE_TIME = 3.0 OPENROUTER_CONTEXT_LENGTH = 200000 OLLAMA_QUALITY_SCORE = 0.70 OLLAMA_RESPONSE_TIME = 5.0 OLLAMA_CONTEXT_LENGTH = 8192 class ComplexityAnalysisConstants: """Constants for complexity analysis calculations.""" # Multilingual text analysis requires different tokenization strategies CHINESE_WORD_RATIO = 2 # Character-based scripts need different word boundaries CHINESE_DOMINANCE_THRESHOLD = 0.3 # Script detection threshold for optimization # Complexity scoring weights (extracted from adaptive_routing.py) WORD_COUNT_WEIGHT = 0.15 SENTENCE_WEIGHT = 0.10 QUESTION_WEIGHT = 0.15 TECHNICAL_TERM_WEIGHT = 0.20 BRANCHING_WEIGHT = 0.15 RESEARCH_WEIGHT = 0.15 ANALYSIS_DEPTH_WEIGHT = 0.10 # Complexity level thresholds SIMPLE_THRESHOLD = 25.0 MODERATE_THRESHOLD = 50.0 COMPLEX_THRESHOLD = 75.0 # Branching bonus for actual branch context BRANCHING_CONTEXT_BONUS = 2 # Text analysis limits MAX_PREVIEW_LENGTH = 200 MIN_SENTENCE_LENGTH = 3 class SecurityConstants: """Security-related constants for input validation.""" # Patterns that indicate potential prompt injection attempts INJECTION_PATTERNS: ClassVar[list[str]] = [ # System/role instruction injections "system:", "user:", "assistant:", "role:", # Prompt escape attempts "ignore previous", "ignore all", "disregard", # Code execution attempts "```python", "```bash", "exec(", "eval(", "__import__", # Instruction manipulation "new instructions", "override", "instead of", # Data extraction attempts "print(", "console.log", "alert(", "document.cookie", ] class ProcessingStrategy(Enum): """Processing strategy enumeration.""" SINGLE_AGENT = "single_agent" MULTI_AGENT = "multi_agent" ADAPTIVE = "adaptive" ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/routing/ai_complexity_analyzer.py: -------------------------------------------------------------------------------- ```python """AI-Powered Complexity Analyzer. Uses an AI agent to intelligently assess thought complexity, replacing the rule-based approach with more nuanced understanding of context, semantics, and depth. """ import json import logging from typing import TYPE_CHECKING, Any from agno.agent import Agent from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config from .complexity_types import ComplexityAnalyzer, ComplexityMetrics if TYPE_CHECKING: from mcp_server_mas_sequential_thinking.core.models import ThoughtData logger = logging.getLogger(__name__) COMPLEXITY_ANALYSIS_PROMPT = """ You are an expert complexity analyzer for thought processing systems. Your task is to analyze the cognitive complexity of a given thought and return a structured assessment. Analyze the following thought and provide complexity metrics: **Thought to Analyze:** "{thought}" **Instructions:** 1. Consider semantic depth, philosophical implications, conceptual complexity 2. Evaluate required cognitive resources (memory, reasoning, creativity) 3. Assess multi-dimensional thinking requirements 4. Consider cultural and linguistic nuances across different languages **Response Format:** Return ONLY a valid JSON object with these exact fields: ```json {{ "complexity_score": <float 0-100>, "word_count": <int>, "sentence_count": <int>, "question_count": <int>, "technical_terms": <int>, "branching_references": <int>, "research_indicators": <int>, "analysis_depth": <int>, "philosophical_depth_boost": <int 0-15>, "primary_problem_type": "<FACTUAL|EMOTIONAL|CRITICAL|OPTIMISTIC|CREATIVE|SYNTHESIS|EVALUATIVE|PHILOSOPHICAL|DECISION>", "thinking_modes_needed": ["<list of required thinking modes>"], "reasoning": "<brief explanation of scoring and problem type analysis>" }} ``` **Scoring Guidelines:** - 0-10: Simple factual questions or basic statements - 11-25: Moderate complexity, requires some analysis - 26-50: Complex topics requiring deep thinking - 51-75: Highly complex, multi-faceted problems - 76-100: Extremely complex philosophical/existential questions **Problem Type Analysis:** - FACTUAL: Information seeking, definitions, statistics (what, when, where, who) - EMOTIONAL: Feelings, intuition, personal experiences (feel, sense, worry) - CRITICAL: Risk assessment, problems, disadvantages (issue, risk, wrong) - OPTIMISTIC: Benefits, opportunities, positive aspects (good, benefit, advantage) - CREATIVE: Innovation, alternatives, new ideas (creative, brainstorm, imagine) - SYNTHESIS: Integration, summary, holistic view (combine, overall, strategy) - EVALUATIVE: Comparison, assessment, judgment (compare, evaluate, best) - PHILOSOPHICAL: Meaning, existence, values (purpose, meaning, ethics) - DECISION: Choice making, selection, recommendations (decide, choose, should) **Thinking Modes Needed:** Select appropriate modes based on problem characteristics: - FACTUAL thinking for information gathering - EMOTIONAL thinking for intuitive insights - CRITICAL thinking for risk analysis - OPTIMISTIC thinking for opportunity identification - CREATIVE thinking for innovation - SYNTHESIS thinking for integration **Special Considerations:** - Philosophical questions like "Why do we live if we die?" should score 40-70+ - Short but profound questions can have high complexity - Consider emotional and existential weight, not just length - Multilingual philosophical concepts preserve cultural context Analyze now: """ class AIComplexityAnalyzer(ComplexityAnalyzer): """AI-powered complexity analyzer using language models.""" def __init__(self, model_config: Any | None = None) -> None: self.model_config = model_config or get_model_config() self._agent: Agent | None = None def _get_agent(self) -> Agent: """Lazy initialization of the analysis agent.""" if self._agent is None: model = self.model_config.create_agent_model() self._agent = Agent( name="ComplexityAnalyzer", model=model, introduction="You are an expert in cognitive complexity assessment, specializing in philosophy and deep thinking analysis.", ) return self._agent async def analyze(self, thought_data: "ThoughtData") -> ComplexityMetrics: """Analyze thought complexity using AI agent.""" logger.info("🤖 AI COMPLEXITY ANALYSIS:") logger.info(f" 📝 Analyzing: {thought_data.thought[:100]}...") try: agent = self._get_agent() prompt = COMPLEXITY_ANALYSIS_PROMPT.format(thought=thought_data.thought) # Get AI analysis result = await agent.arun(input=prompt) # Extract JSON response response_text = self._extract_response_content(result) complexity_data = self._parse_json_response(response_text) # Create metrics object with AI assessment metrics = ComplexityMetrics( complexity_score=complexity_data.get("complexity_score", 0.0), word_count=complexity_data.get("word_count", 0), sentence_count=complexity_data.get("sentence_count", 0), question_count=complexity_data.get("question_count", 0), technical_terms=complexity_data.get("technical_terms", 0), branching_references=complexity_data.get("branching_references", 0), research_indicators=complexity_data.get("research_indicators", 0), analysis_depth=complexity_data.get("analysis_depth", 0), philosophical_depth_boost=complexity_data.get( "philosophical_depth_boost", 0 ), # AI Analysis Results (critical for routing) primary_problem_type=complexity_data.get( "primary_problem_type", "GENERAL" ), thinking_modes_needed=complexity_data.get( "thinking_modes_needed", ["SYNTHESIS"] ), analyzer_type="ai", reasoning=complexity_data.get("reasoning", "AI analysis"), ) logger.info(f" 🎯 AI Complexity Score: {metrics.complexity_score:.1f}/100") logger.info( f" 💭 Reasoning: {complexity_data.get('reasoning', 'No reasoning provided')[:100]}..." ) return metrics except Exception as e: logger.exception(f"❌ AI complexity analysis failed: {e}") # Fallback to basic analysis return self._basic_fallback_analysis(thought_data) def _extract_response_content(self, result: Any) -> str: """Extract content from agent response.""" if hasattr(result, "content"): return str(result.content) return str(result) def _parse_json_response(self, response_text: str) -> dict[str, Any]: """Parse JSON from AI response, handling various formats.""" # Try to find JSON in the response lines = response_text.strip().split("\n") for line in lines: line = line.strip() if line.startswith("{") and line.endswith("}"): try: return json.loads(line) except json.JSONDecodeError: continue # Try to extract JSON from code blocks if "```json" in response_text: start = response_text.find("```json") + 7 end = response_text.find("```", start) if end > start: json_text = response_text[start:end].strip() try: return json.loads(json_text) except json.JSONDecodeError: pass # Try parsing the entire response as JSON try: return json.loads(response_text) except json.JSONDecodeError: logger.warning( f"Failed to parse AI response as JSON: {response_text[:200]}" ) raise ValueError("Could not parse AI complexity analysis response") def _basic_fallback_analysis( self, thought_data: "ThoughtData" ) -> ComplexityMetrics: """Fallback to basic analysis if AI fails.""" logger.warning("🔄 Falling back to basic complexity analysis") text = thought_data.thought.lower() # Basic metrics words = len(text.split()) sentences = len([s for s in text.split(".") if s.strip()]) questions = text.count("?") + text.count("?") # Simple heuristics philosophical_terms = [ "意义", "存在", "生命", "死亡", "为什么", "why", "meaning", "life", "death", ] philosophical_count = sum(1 for term in philosophical_terms if term in text) # Basic scoring base_score = min(words * 2 + questions * 5 + philosophical_count * 10, 100) return ComplexityMetrics( complexity_score=base_score, word_count=words, sentence_count=max(sentences, 1), question_count=questions, technical_terms=philosophical_count, branching_references=0, research_indicators=0, analysis_depth=philosophical_count, philosophical_depth_boost=min(philosophical_count * 5, 15), # Basic AI analysis results for fallback primary_problem_type="PHILOSOPHICAL" if philosophical_count > 0 else "GENERAL", thinking_modes_needed=["SYNTHESIS", "CREATIVE"] if philosophical_count > 2 else ["FACTUAL"], analyzer_type="basic_fallback", reasoning="Fallback analysis due to AI failure", ) # No more monkey patching needed - complexity_score is now a direct field def create_ai_complexity_analyzer() -> AIComplexityAnalyzer: """Create AI complexity analyzer instance.""" return AIComplexityAnalyzer() ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/processing_orchestrator.py: -------------------------------------------------------------------------------- ```python """Processing orchestration service for thought coordination. This service handles the orchestration of different processing strategies, coordinating between single-agent and multi-agent approaches, and managing execution flows based on complexity analysis. """ import time from typing import TYPE_CHECKING from agno.agent import Agent from mcp_server_mas_sequential_thinking.config import get_model_config from mcp_server_mas_sequential_thinking.core import ( SessionMemory, ThoughtProcessingError, ) from mcp_server_mas_sequential_thinking.utils import setup_logging if TYPE_CHECKING: from mcp_server_mas_sequential_thinking.routing import ComplexityLevel from mcp_server_mas_sequential_thinking.services.response_processor import ( ResponseProcessor, ) from mcp_server_mas_sequential_thinking.services.retry_handler import ( TeamProcessingRetryHandler, ) logger = setup_logging() class ProcessingOrchestrator: """Service responsible for orchestrating different processing strategies.""" def __init__( self, session: SessionMemory, response_processor: "ResponseProcessor", retry_handler: "TeamProcessingRetryHandler", ) -> None: """Initialize the processing orchestrator. Args: session: The session memory instance response_processor: Response processing service retry_handler: Retry handling service """ self._session = session self._response_processor = response_processor self._retry_handler = retry_handler # Initialize performance tracking from mcp_server_mas_sequential_thinking.infrastructure import ( MetricsLogger, ) self._metrics_logger = MetricsLogger() async def execute_single_agent_processing( self, input_prompt: str, simplified: bool = False ) -> str: """Execute single-agent processing for simple thoughts. Args: input_prompt: The input prompt to process simplified: Whether to use simplified prompt format Returns: Processed response content """ try: # Create a lightweight agent for single processing simple_agent = self._create_simple_agent( processing_type="simple thought" if simplified else "thought", use_markdown=not simplified, ) # Optionally simplify the prompt prompt_to_use = ( self._create_simplified_prompt(input_prompt) if simplified else input_prompt ) # Log single agent call details self._log_single_agent_call(simple_agent, prompt_to_use) start_time = time.time() response = await simple_agent.arun(prompt_to_use) processing_time = time.time() - start_time # Extract content from Agno RunOutput response_content = self._extract_response_content(response) # Log single agent response details self._log_single_agent_response(response_content, processing_time) logger.info("Single-agent processing completed successfully") return response_content except Exception as e: logger.warning(f"Single-agent processing failed, falling back to team: {e}") # Fallback to team processing return await self.execute_team_processing(input_prompt) async def execute_team_processing(self, input_prompt: str) -> str: """Execute team processing without timeout restrictions. Args: input_prompt: The input prompt to process Returns: Processed response content """ try: response = await self._session.team.arun(input_prompt) return self._extract_response_content(response) except Exception as e: raise ThoughtProcessingError(f"Team coordination failed: {e}") from e async def execute_team_processing_with_retries( self, input_prompt: str, complexity_level: "ComplexityLevel" ) -> str: """Execute team processing using centralized retry handler. Args: input_prompt: The input prompt to process complexity_level: Complexity level for retry strategy Returns: Processed response content """ team_info = self._get_team_info() self._metrics_logger.log_team_details(team_info) self._metrics_logger.log_input_details(input_prompt) async def team_operation(): start_time = time.time() response = await self._session.team.arun(input_prompt) processing_time = time.time() - start_time processed_response = self._response_processor.process_response( response, processing_time, "MULTI-AGENT TEAM" ) self._performance_tracker.record_processing(processing_time, True) return processed_response.content return await self._retry_handler.execute_team_processing( team_operation, team_info, complexity_level.value ) def _create_simple_agent( self, processing_type: str = "thought", use_markdown: bool = False ) -> Agent: """Create a simple agent for single-thought processing. Args: processing_type: Type of processing for instructions use_markdown: Whether to enable markdown formatting Returns: Configured Agent instance """ model_config = get_model_config() single_model = model_config.create_team_model() return Agent( name="SimpleProcessor", role="Simple Thought Processor", description=f"Processes {processing_type}s efficiently without multi-agent overhead", model=single_model, instructions=[ f"You are processing a {processing_type} efficiently.", "Provide a focused, clear response.", "Include guidance for the next step.", "Be concise but helpful.", ], markdown=use_markdown, ) def _create_simplified_prompt(self, input_prompt: str) -> str: """Create a simplified prompt for single-agent processing. Args: input_prompt: The original input prompt Returns: Simplified prompt """ return f"""Process this thought efficiently: {input_prompt} Provide a focused response with clear guidance for the next step.""" def _extract_response_content(self, response) -> str: """Extract clean content from Agno RunOutput objects. Args: response: The response object from agent processing Returns: Extracted text content """ from mcp_server_mas_sequential_thinking.services.response_formatter import ( ResponseExtractor, ) return ResponseExtractor.extract_content(response) def _get_team_info(self) -> dict: """Extract team information for logging and retry handling. Returns: Dictionary containing team information """ team = self._session.team return { "name": team.name, "member_count": len(team.members), "leader_class": team.model.__class__.__name__, "leader_model": getattr(team.model, "id", "unknown"), "member_names": ", ".join([m.name for m in team.members]), } def _log_single_agent_call(self, agent: Agent, prompt: str) -> None: """Log single agent call details. Args: agent: The agent being used prompt: The prompt being processed """ logger.info("🤖 SINGLE-AGENT CALL:") logger.info(f" Agent: {agent.name} ({agent.role})") logger.info( f" Model: {getattr(agent.model, 'id', 'unknown')} ({agent.model.__class__.__name__})" ) self._log_input_details(prompt) def _log_single_agent_response(self, content: str, processing_time: float) -> None: """Log single agent response details. Args: content: The response content processing_time: Processing time in seconds """ logger.info("✅ SINGLE-AGENT RESPONSE:") self._log_output_details(content, processing_time) def _log_input_details( self, input_prompt: str, context_description: str = "input" ) -> None: """Log input details with consistent formatting. Args: input_prompt: The input prompt to log context_description: Description of the context """ logger.info(f" Input length: {len(input_prompt)} chars") logger.info(f" Full {context_description}:\\n{input_prompt}") # Use performance metrics constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import PerformanceMetrics separator_length = PerformanceMetrics.SEPARATOR_LENGTH except ImportError: pass logger.info(f" {'=' * separator_length}") def _log_output_details( self, response_content: str, processing_time: float, context_description: str = "response", ) -> None: """Log output details with consistent formatting. Args: response_content: The response content to log processing_time: Processing time in seconds context_description: Description of the context """ logger.info(f" Processing time: {processing_time:.3f}s") logger.info(f" Output length: {len(response_content)} chars") logger.info(f" Full {context_description}:\\n{response_content}") # Use performance metrics constant if available separator_length = 60 # Default fallback try: from mcp_server_mas_sequential_thinking.config import PerformanceMetrics separator_length = PerformanceMetrics.SEPARATOR_LENGTH except ImportError: pass logger.info(f" {'=' * separator_length}") ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/server_core.py: -------------------------------------------------------------------------------- ```python """Refactored server core with separated concerns and reduced complexity.""" import os from abc import ABC, abstractmethod from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass from pathlib import Path from typing import Any # Lazy import to break circular dependency from pydantic import ValidationError from mcp_server_mas_sequential_thinking.config import ( DefaultTimeouts, DefaultValues, PerformanceMetrics, check_required_api_keys, ) from mcp_server_mas_sequential_thinking.core import ( ConfigurationError, SessionMemory, ThoughtData, ) from mcp_server_mas_sequential_thinking.utils import setup_logging logger = setup_logging() class LoggingMixin: """Mixin class providing common logging utilities with reduced duplication.""" @staticmethod def _log_section_header( title: str, separator_length: int = PerformanceMetrics.SEPARATOR_LENGTH ) -> None: """Log a formatted section header.""" logger.info(f"{title}") @staticmethod def _log_metrics_block(title: str, metrics: dict[str, Any]) -> None: """Log a formatted metrics block.""" logger.info(f"{title}") for key, value in metrics.items(): if isinstance(value, float): logger.info(f" {key}: {value:.2f}") elif isinstance(value, (int, str)): logger.info(f" {key}: {value}") else: logger.info(f" {key}: {value}") @staticmethod def _log_separator(length: int = PerformanceMetrics.SEPARATOR_LENGTH) -> None: """Log a separator line.""" logger.info(f" {'=' * length}") @staticmethod def _calculate_efficiency_score(processing_time: float) -> float: """Calculate efficiency score using standard metrics.""" return ( PerformanceMetrics.PERFECT_EFFICIENCY_SCORE if processing_time < PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD else max( PerformanceMetrics.MINIMUM_EFFICIENCY_SCORE, PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD / processing_time, ) ) @staticmethod def _calculate_execution_consistency(success_indicator: bool) -> float: """Calculate execution consistency using standard metrics.""" return ( PerformanceMetrics.PERFECT_EXECUTION_CONSISTENCY if success_indicator else PerformanceMetrics.DEFAULT_EXECUTION_CONSISTENCY ) @dataclass(frozen=True, slots=True) class ServerConfig: """Immutable server configuration with clear defaults.""" provider: str team_mode: str = DefaultValues.DEFAULT_TEAM_MODE log_level: str = DefaultValues.DEFAULT_LOG_LEVEL max_retries: int = DefaultValues.DEFAULT_MAX_RETRIES timeout: float = DefaultTimeouts.PROCESSING_TIMEOUT @classmethod def from_environment(cls) -> "ServerConfig": """Create config from environment with sensible defaults.""" return cls( provider=os.environ.get("LLM_PROVIDER", DefaultValues.DEFAULT_LLM_PROVIDER), team_mode=os.environ.get( "TEAM_MODE", DefaultValues.DEFAULT_TEAM_MODE ).lower(), log_level=os.environ.get("LOG_LEVEL", DefaultValues.DEFAULT_LOG_LEVEL), max_retries=int( os.environ.get("MAX_RETRIES", str(DefaultValues.DEFAULT_MAX_RETRIES)) ), timeout=float( os.environ.get("TIMEOUT", str(DefaultValues.DEFAULT_TIMEOUT)) ), ) class ServerInitializer(ABC): """Abstract initializer for server components.""" @abstractmethod async def initialize(self, config: ServerConfig) -> None: """Initialize server component.""" @abstractmethod async def cleanup(self) -> None: """Clean up server component.""" class EnvironmentInitializer(ServerInitializer): """Handles environment validation and setup.""" async def initialize(self, config: ServerConfig) -> None: """Validate environment requirements with enhanced error handling.""" logger.info(f"Initializing environment with {config.provider} provider") try: # Graceful degradation prevents startup failures from missing optional keys missing_keys = check_required_api_keys() if missing_keys: logger.warning(f"Missing API keys: {', '.join(missing_keys)}") # Note: Don't fail here as some providers might not require keys # Ensure log directory exists log_dir = Path.home() / ".sequential_thinking" / "logs" if not log_dir.exists(): logger.info(f"Creating log directory: {log_dir}") try: log_dir.mkdir(parents=True, exist_ok=True) except OSError as e: raise ConfigurationError( f"Failed to create log directory {log_dir}: {e}" ) from e except Exception as e: if not isinstance(e, ConfigurationError): raise ConfigurationError( f"Environment initialization failed: {e}" ) from e raise async def cleanup(self) -> None: """No cleanup needed for environment.""" class ServerState: """Manages server state with proper lifecycle and separation of concerns.""" def __init__(self) -> None: self._config: ServerConfig | None = None self._session: SessionMemory | None = None self._initializers = [ EnvironmentInitializer(), ] async def initialize(self, config: ServerConfig) -> None: """Initialize all server components.""" self._config = config # Ordered initialization prevents dependency conflicts for initializer in self._initializers: await initializer.initialize(config) # Session-based architecture simplifies state management self._session = SessionMemory() logger.info( "Server state initialized successfully with multi-thinking workflow" ) async def cleanup(self) -> None: """Clean up all server components.""" # Clean up in reverse order for initializer in reversed(self._initializers): await initializer.cleanup() self._config = None self._session = None logger.info("Server state cleaned up") @property def config(self) -> ServerConfig: """Get current configuration.""" if self._config is None: raise RuntimeError("Server not initialized - config unavailable") return self._config @property def session(self) -> SessionMemory: """Get current session.""" if self._session is None: raise RuntimeError("Server not initialized - session unavailable") return self._session # Remove redundant exception definition as it's now in types.py class ThoughtProcessor: """Simplified thought processor that delegates to the refactored implementation. This maintains backward compatibility while using the new service-based architecture. """ def __init__(self, session: SessionMemory) -> None: # Import and delegate to the refactored implementation from .thought_processor_refactored import ( ThoughtProcessor as RefactoredProcessor, ) self._processor = RefactoredProcessor(session) async def process_thought(self, thought_data: ThoughtData) -> str: """Process a thought through the team with comprehensive error handling.""" return await self._processor.process_thought(thought_data) # Legacy methods for backward compatibility - delegate to refactored processor def _extract_response_content(self, response) -> str: """Legacy method - delegates to refactored processor.""" return self._processor._extract_response_content(response) def _build_context_prompt(self, thought_data: ThoughtData) -> str: """Legacy method - delegates to refactored processor.""" return self._processor._build_context_prompt(thought_data) def _format_response(self, content: str, thought_data: ThoughtData) -> str: """Legacy method - delegates to refactored processor.""" return self._processor._format_response(content, thought_data) async def _execute_single_agent_processing( self, input_prompt: str, routing_decision=None ) -> str: """Legacy method - delegates to refactored processor.""" return await self._processor._execute_single_agent_processing( input_prompt, routing_decision ) async def _execute_team_processing(self, input_prompt: str) -> str: """Legacy method - delegates to refactored processor.""" return await self._processor._execute_team_processing(input_prompt) @asynccontextmanager async def create_server_lifespan() -> AsyncIterator[ServerState]: """Create server lifespan context manager with proper resource management.""" config = ServerConfig.from_environment() server_state = ServerState() try: await server_state.initialize(config) logger.info("Server started successfully") yield server_state except Exception as e: logger.error(f"Server initialization failed: {e}", exc_info=True) raise ServerInitializationError(f"Failed to initialize server: {e}") from e finally: await server_state.cleanup() logger.info("Server shutdown complete") class ServerInitializationError(Exception): """Custom exception for server initialization failures.""" def create_validated_thought_data( thought: str, thoughtNumber: int, totalThoughts: int, nextThoughtNeeded: bool, isRevision: bool, branchFromThought: int | None, branchId: str | None, needsMoreThoughts: bool, ) -> ThoughtData: """Create and validate thought data with enhanced error reporting.""" try: return ThoughtData( thought=thought.strip(), thoughtNumber=thoughtNumber, totalThoughts=totalThoughts, nextThoughtNeeded=nextThoughtNeeded, isRevision=isRevision, branchFromThought=branchFromThought, branchId=branchId.strip() if branchId else None, needsMoreThoughts=needsMoreThoughts, ) except ValidationError as e: raise ValueError(f"Invalid thought data: {e}") from e # Global server state with workflow support _server_state: ServerState | None = None _thought_processor: ThoughtProcessor | None = None async def get_thought_processor() -> ThoughtProcessor: """Get or create the global thought processor with workflow support.""" global _thought_processor, _server_state if _thought_processor is None: if _server_state is None: raise RuntimeError( "Server not properly initialized - _server_state is None. Ensure app_lifespan is running." ) logger.info("Initializing ThoughtProcessor with multi-thinking workflow") _thought_processor = ThoughtProcessor(_server_state.session) return _thought_processor ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/config/modernized_config.py: -------------------------------------------------------------------------------- ```python """Modernized configuration management with dependency injection and clean abstractions. Clean configuration management with modern Python patterns. """ import os from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, Protocol, runtime_checkable from agno.models.anthropic import Claude from agno.models.base import Model from agno.models.deepseek import DeepSeek from agno.models.groq import Groq from agno.models.ollama import Ollama from agno.models.openai import OpenAIChat from agno.models.openrouter import OpenRouter class GitHubOpenAI(OpenAIChat): """OpenAI provider configured for GitHub Models API with enhanced validation.""" @staticmethod def _validate_github_token(token: str) -> None: """Validate GitHub token format with comprehensive security checks.""" if not token: raise ValueError("GitHub token is required but not provided") # Strip whitespace and validate basic format token = token.strip() # Valid GitHub token prefixes with their expected lengths token_specs = { "ghp_": 40, # Classic personal access token "github_pat_": lambda t: len(t) >= 93, # Fine-grained PAT (variable length) "gho_": 40, # OAuth app token "ghu_": 40, # User-to-server token "ghs_": 40, # Server-to-server token } # Check token prefix and length valid_token = False for prefix, expected_length in token_specs.items(): if token.startswith(prefix): if callable(expected_length): if expected_length(token): valid_token = True break elif len(token) == expected_length: valid_token = True break if not valid_token: raise ValueError( "Invalid GitHub token format. Token must be a valid GitHub personal " "access token, " "OAuth token, or fine-grained personal access token with correct " "prefix and length." ) # Enhanced entropy validation to prevent fake tokens token_body = ( token[4:] if token.startswith("ghp_") else token.split("_", 1)[1] if "_" in token else token ) # Check for minimum entropy (character diversity) unique_chars = len(set(token_body.lower())) if unique_chars < 15: # GitHub tokens should have high entropy raise ValueError( "GitHub token appears to have insufficient entropy. Please ensure " "you're using a real GitHub token." ) # Check for obvious fake patterns fake_patterns = ["test", "fake", "demo", "example", "placeholder", "your_token"] if any(pattern in token.lower() for pattern in fake_patterns): raise ValueError( "GitHub token appears to be a placeholder or test value. Please " "use a real GitHub token." ) def __init__(self, **kwargs: Any) -> None: # Set GitHub Models configuration kwargs.setdefault("base_url", "https://models.github.ai/inference") if "api_key" not in kwargs: kwargs["api_key"] = os.environ.get("GITHUB_TOKEN") api_key = kwargs.get("api_key") if not api_key: raise ValueError( "GitHub token is required but not found in GITHUB_TOKEN " "environment variable" ) if isinstance(api_key, str): self._validate_github_token(api_key) super().__init__(**kwargs) @dataclass(frozen=True) class ModelConfig: """Immutable configuration for model provider and settings.""" provider_class: type[Model] enhanced_model_id: str standard_model_id: str api_key: str | None = None def create_enhanced_model(self) -> Model: """Create enhanced model instance (used for complex synthesis like Blue Hat).""" # Enable prompt caching for Anthropic models if self.provider_class == Claude: return self.provider_class( id=self.enhanced_model_id, # Note: cache_system_prompt removed - not available in current Agno version ) return self.provider_class(id=self.enhanced_model_id) def create_standard_model(self) -> Model: """Create standard model instance (used for individual hat processing).""" # Enable prompt caching for Anthropic models if self.provider_class == Claude: return self.provider_class( id=self.standard_model_id, # Note: cache_system_prompt removed - not available in current Agno version ) return self.provider_class(id=self.standard_model_id) @runtime_checkable class ConfigurationStrategy(Protocol): """Protocol defining configuration strategy interface.""" def get_config(self) -> ModelConfig: """Return model configuration for this strategy.""" ... def get_required_environment_variables(self) -> dict[str, bool]: """Return required environment variables and whether they're optional.""" ... class BaseProviderStrategy(ABC): """Abstract base strategy with common functionality.""" @property @abstractmethod def provider_class(self) -> type[Model]: """Return the provider model class.""" @property @abstractmethod def default_enhanced_model(self) -> str: """Return default enhanced model ID (for complex synthesis).""" @property @abstractmethod def default_standard_model(self) -> str: """Return default standard model ID (for individual processing).""" @property @abstractmethod def api_key_name(self) -> str | None: """Return API key environment variable name.""" def _get_env_with_fallback(self, env_var: str, fallback: str) -> str: """Get environment variable with fallback to default.""" value = os.environ.get(env_var, "").strip() return value if value else fallback def get_config(self) -> ModelConfig: """Get complete configuration with environment overrides.""" prefix = self.__class__.__name__.replace("Strategy", "").upper() enhanced_model = self._get_env_with_fallback( f"{prefix}_ENHANCED_MODEL_ID", self.default_enhanced_model ) standard_model = self._get_env_with_fallback( f"{prefix}_STANDARD_MODEL_ID", self.default_standard_model ) # Get API key with enhanced validation and None handling api_key: str | None = None if self.api_key_name: api_key = os.environ.get(self.api_key_name, "").strip() api_key = api_key if api_key else None return ModelConfig( provider_class=self.provider_class, enhanced_model_id=enhanced_model, standard_model_id=standard_model, api_key=api_key, ) def get_required_environment_variables(self) -> dict[str, bool]: """Return required environment variables.""" env_vars: dict[str, bool] = {} if self.api_key_name: env_vars[self.api_key_name] = False # Required # Enhanced/standard environment variables (optional) prefix = self.__class__.__name__.replace("Strategy", "").upper() env_vars[f"{prefix}_ENHANCED_MODEL_ID"] = True # Optional env_vars[f"{prefix}_STANDARD_MODEL_ID"] = True # Optional return env_vars # Concrete strategy implementations class DeepSeekStrategy(BaseProviderStrategy): """DeepSeek provider strategy.""" provider_class = DeepSeek default_enhanced_model = "deepseek-chat" default_standard_model = "deepseek-chat" api_key_name = "DEEPSEEK_API_KEY" class GroqStrategy(BaseProviderStrategy): """Groq provider strategy.""" provider_class = Groq default_enhanced_model = "openai/gpt-oss-120b" default_standard_model = "openai/gpt-oss-20b" api_key_name = "GROQ_API_KEY" class OpenRouterStrategy(BaseProviderStrategy): """OpenRouter provider strategy.""" provider_class = OpenRouter default_enhanced_model = "deepseek/deepseek-chat-v3-0324" default_standard_model = "deepseek/deepseek-r1" api_key_name = "OPENROUTER_API_KEY" class OllamaStrategy(BaseProviderStrategy): """Ollama provider strategy (no API key required).""" provider_class = Ollama default_enhanced_model = "devstral:24b" default_standard_model = "devstral:24b" api_key_name = None class GitHubStrategy(BaseProviderStrategy): """GitHub Models provider strategy.""" @property def provider_class(self) -> type[Model]: return GitHubOpenAI @property def default_enhanced_model(self) -> str: return "openai/gpt-5" @property def default_standard_model(self) -> str: return "openai/gpt-5-min" @property def api_key_name(self) -> str: return "GITHUB_TOKEN" class AnthropicStrategy(BaseProviderStrategy): """Anthropic Claude provider strategy with prompt caching enabled.""" @property def provider_class(self) -> type[Model]: return Claude @property def default_enhanced_model(self) -> str: return "claude-3-5-sonnet-20241022" @property def default_standard_model(self) -> str: return "claude-3-5-haiku-20241022" @property def api_key_name(self) -> str: return "ANTHROPIC_API_KEY" class ConfigurationManager: """Manages configuration strategies with dependency injection.""" def __init__(self) -> None: self._strategies = { "deepseek": DeepSeekStrategy(), "groq": GroqStrategy(), "openrouter": OpenRouterStrategy(), "ollama": OllamaStrategy(), "github": GitHubStrategy(), "anthropic": AnthropicStrategy(), } self._default_strategy = "deepseek" def register_strategy(self, name: str, strategy: ConfigurationStrategy) -> None: """Register a new configuration strategy.""" self._strategies[name] = strategy def get_strategy(self, provider_name: str | None = None) -> ConfigurationStrategy: """Get strategy for specified provider.""" if provider_name is None: provider_name = os.environ.get("LLM_PROVIDER", self._default_strategy) provider_name = provider_name.lower() if provider_name not in self._strategies: available = list(self._strategies.keys()) raise ValueError( f"Unknown provider: {provider_name}. Available: {available}" ) return self._strategies[provider_name] def get_model_config(self, provider_name: str | None = None) -> ModelConfig: """Get model configuration using dependency injection.""" strategy = self.get_strategy(provider_name) return strategy.get_config() def validate_environment(self, provider_name: str | None = None) -> dict[str, str]: """Validate environment variables and return missing required ones.""" strategy = self.get_strategy(provider_name) required_vars = strategy.get_required_environment_variables() missing: dict[str, str] = {} for var_name, is_optional in required_vars.items(): if not is_optional and not os.environ.get(var_name): missing[var_name] = "Required but not set" # Check EXA API key for research functionality (optional) # Note: EXA tools will be disabled if key is not provided exa_key = os.environ.get("EXA_API_KEY") if not exa_key: # Don't fail startup - just log warning that research will be disabled import logging logging.getLogger(__name__).warning( "EXA_API_KEY not found. Research tools will be disabled." ) return missing def get_available_providers(self) -> list[str]: """Get list of available provider names.""" return list(self._strategies.keys()) # Singleton instance for dependency injection _config_manager = ConfigurationManager() # Public API functions def get_model_config(provider_name: str | None = None) -> ModelConfig: """Get model configuration using modernized configuration management.""" return _config_manager.get_model_config(provider_name) def check_required_api_keys(provider_name: str | None = None) -> list[str]: """Check for missing required API keys.""" missing_vars = _config_manager.validate_environment(provider_name) return list(missing_vars.keys()) def register_provider_strategy(name: str, strategy: ConfigurationStrategy) -> None: """Register a custom provider strategy.""" _config_manager.register_strategy(name, strategy) def get_available_providers() -> list[str]: """Get list of available providers.""" return _config_manager.get_available_providers() ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/routing/agno_workflow_router.py: -------------------------------------------------------------------------------- ```python """Multi-Thinking Workflow Router - Complete Rewrite. 纯净的多向思维工作流实现, 基于Agno v2.0框架。 完全移除旧的复杂度路由, 专注于多向思维方法论。 """ import logging import re import time from dataclasses import dataclass from typing import Any from agno.workflow.router import Router from agno.workflow.step import Step from agno.workflow.types import StepInput, StepOutput from agno.workflow.workflow import Workflow from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config # Import at module level - moved from function to avoid PLC0415 from mcp_server_mas_sequential_thinking.core.models import ThoughtData from mcp_server_mas_sequential_thinking.processors.multi_thinking_processor import ( MultiThinkingSequentialProcessor, create_multi_thinking_step_output, ) logger = logging.getLogger(__name__) @dataclass class MultiThinkingWorkflowResult: """Result from Multi-Thinking workflow execution.""" content: str strategy_used: str processing_time: float complexity_score: float step_name: str thinking_sequence: list[str] cost_reduction: float class MultiThinkingWorkflowRouter: """Pure Multi-Thinking workflow router using Agno v2.0.""" def __init__(self) -> None: """Initialize Multi-Thinking workflow router.""" self.model_config = get_model_config() # Initialize Multi-Thinking processor self.multi_thinking_processor = MultiThinkingSequentialProcessor() # Create Multi-Thinking processing step self.multi_thinking_step = self._create_multi_thinking_step() # Create router that always selects Multi-Thinking self.router = Router( name="multi_thinking_router", selector=self._multi_thinking_selector, choices=[self.multi_thinking_step], ) # Create main workflow self.workflow = Workflow( name="multi_thinking_workflow", steps=[self.router], ) logger.info("Multi-Thinking Workflow Router initialized") def _multi_thinking_selector(self, step_input: StepInput) -> list[Step]: """Selector that always returns Multi-Thinking processing.""" try: logger.info("🎩 MULTI-THINKING WORKFLOW ROUTING:") # Extract thought content for logging if isinstance(step_input.input, dict): thought_content = step_input.input.get("thought", "") thought_number = step_input.input.get("thought_number", 1) total_thoughts = step_input.input.get("total_thoughts", 1) else: thought_content = str(step_input.input) thought_number = 1 total_thoughts = 1 logger.info( " 📝 Input: %s%s", thought_content[:100], "..." if len(thought_content) > 100 else "", ) logger.info(" 🔢 Progress: %s/%s", thought_number, total_thoughts) logger.info(" ✅ Multi-Thinking selected - exclusive thinking methodology") return [self.multi_thinking_step] except Exception as e: logger.exception("Error in Multi-Thinking selector: %s", e) logger.warning("Continuing with Multi-Thinking processing despite error") return [self.multi_thinking_step] def _create_multi_thinking_step(self) -> Step: """Create Six Thinking Hats processing step.""" async def multi_thinking_executor( step_input: StepInput, session_state: dict[str, Any] ) -> StepOutput: """Execute Multi-Thinking thinking process.""" try: logger.info("🎩 MULTI-THINKING STEP EXECUTION:") # Extract thought content and metadata if isinstance(step_input.input, dict): thought_content = step_input.input.get( "thought", str(step_input.input) ) thought_number = step_input.input.get("thought_number", 1) total_thoughts = step_input.input.get("total_thoughts", 1) context = step_input.input.get("context", "") else: thought_content = str(step_input.input) thought_number = 1 total_thoughts = 1 context = "" # ThoughtData is now imported at module level # Extract context preservation fields from input if available if isinstance(step_input.input, dict): is_revision = step_input.input.get("isRevision", False) branch_from_thought = step_input.input.get("branchFromThought") branch_id = step_input.input.get("branchId") needs_more_thoughts = step_input.input.get( "needsMoreThoughts", False ) next_thought_needed = step_input.input.get( "nextThoughtNeeded", True ) else: is_revision = False branch_from_thought = None branch_id = None needs_more_thoughts = False next_thought_needed = True thought_data = ThoughtData( thought=thought_content, thoughtNumber=thought_number, totalThoughts=total_thoughts, nextThoughtNeeded=next_thought_needed, isRevision=is_revision, branchFromThought=branch_from_thought, branchId=branch_id, needsMoreThoughts=needs_more_thoughts, ) logger.info(" 📝 Input: %s...", thought_content[:100]) logger.info(" 🔢 Thought: %s/%s", thought_number, total_thoughts) # Process with Multi-Thinking result = ( await self.multi_thinking_processor.process_with_multi_thinking( thought_data, context ) ) # Store metadata in session_state session_state["current_strategy"] = result.strategy_used session_state["current_complexity_score"] = result.complexity_score session_state["thinking_sequence"] = result.thinking_sequence session_state["cost_reduction"] = result.cost_reduction logger.info(" ✅ Multi-Thinking completed: %s", result.strategy_used) logger.info(" 📊 Complexity: %.1f", result.complexity_score) logger.info(" 💰 Cost Reduction: %.1f%%", result.cost_reduction) return create_multi_thinking_step_output(result) except Exception as e: logger.exception(" ❌ Multi-Thinking execution failed") return StepOutput( content=f"Multi-Thinking processing failed: {e!s}", success=False, error=str(e), step_name="multi_thinking_error", ) return Step( name="multi_thinking_processing", executor=multi_thinking_executor, description="Six Thinking Hats sequential processing", ) async def process_thought_workflow( self, thought_data: "ThoughtData", context_prompt: str ) -> MultiThinkingWorkflowResult: """Process thought using Multi-Thinking workflow.""" start_time = time.time() try: logger.info("🚀 MULTI-THINKING WORKFLOW INITIALIZATION:") logger.info( " 📝 Thought: %s%s", thought_data.thought[:100], "..." if len(thought_data.thought) > 100 else "", ) logger.info( " 🔢 Thought Number: %s/%s", thought_data.thoughtNumber, thought_data.totalThoughts, ) logger.info(" 📋 Context Length: %d chars", len(context_prompt)) logger.info( " ⏰ Start Time: %s", time.strftime("%H:%M:%S", time.localtime(start_time)), ) # Prepare workflow input for Multi-Thinking workflow_input = { "thought": thought_data.thought, "thought_number": thought_data.thoughtNumber, "total_thoughts": thought_data.totalThoughts, "context": context_prompt, } logger.info("📦 WORKFLOW INPUT PREPARATION:") logger.info(" 📊 Input Keys: %s", list(workflow_input.keys())) logger.info(" 📏 Input Size: %d chars", len(str(workflow_input))) # Initialize session_state for metadata tracking session_state: dict[str, Any] = { "start_time": start_time, "thought_number": thought_data.thoughtNumber, "total_thoughts": thought_data.totalThoughts, } logger.info("🎯 SESSION STATE SETUP:") logger.info(" 🔑 State Keys: %s", list(session_state.keys())) logger.info(" 📈 Metadata: %s", session_state) logger.info( "▶️ EXECUTING Multi-Thinking workflow for thought #%s", thought_data.thoughtNumber, ) # Execute Multi-Thinking workflow logger.info("🔄 WORKFLOW EXECUTION START...") result = await self.workflow.arun( input=workflow_input, session_state=session_state ) logger.info("✅ WORKFLOW EXECUTION COMPLETED") processing_time = time.time() - start_time # Extract clean content from result content = self._extract_clean_content(result) logger.info("📋 CONTENT VALIDATION:") logger.info( " ✅ Content extracted successfully: %d characters", len(content) ) logger.info( " 📝 Content preview: %s%s", content[:150], "..." if len(content) > 150 else "", ) # Get metadata from session_state complexity_score = session_state.get("current_complexity_score", 0.0) strategy_used = session_state.get("current_strategy", "multi_thinking") thinking_sequence = session_state.get("thinking_sequence", []) cost_reduction = session_state.get("cost_reduction", 0.0) logger.info("📊 WORKFLOW RESULT COMPILATION:") logger.info(" 🎯 Strategy used: %s", strategy_used) logger.info(" 🧠 Thinking sequence: %s", " → ".join(thinking_sequence)) logger.info(" 📈 Complexity score: %.1f", complexity_score) logger.info(" 💰 Cost reduction: %.1f%%", cost_reduction) logger.info(" ⏱️ Processing time: %.3fs", processing_time) workflow_result = MultiThinkingWorkflowResult( content=content, strategy_used=strategy_used, processing_time=processing_time, complexity_score=complexity_score, step_name="multi_thinking_execution", thinking_sequence=thinking_sequence, cost_reduction=cost_reduction, ) logger.info("🎉 MULTI-THINKING WORKFLOW COMPLETION:") logger.info( " ✅ Completed: strategy=%s, time=%.3fs, score=%.1f, reduction=%.1f%%", strategy_used, processing_time, complexity_score, cost_reduction, ) return workflow_result except Exception as e: processing_time = time.time() - start_time logger.exception( "Multi-Thinking workflow execution failed after %.3fs", processing_time ) return MultiThinkingWorkflowResult( content=f"Error processing thought with Multi-Thinking: {e!s}", strategy_used="error_fallback", processing_time=processing_time, complexity_score=0.0, step_name="error_handling", thinking_sequence=[], cost_reduction=0.0, ) def _extract_clean_content(self, result: Any) -> str: """Extract clean content from workflow result.""" def extract_recursive(obj: Any, depth: int = 0) -> str: """Recursively extract clean content from nested objects.""" if depth > 10: # Prevent infinite recursion return str(obj) # Handle dictionary with common content keys if isinstance(obj, dict): for key in [ "result", "content", "message", "text", "response", "output", "answer", ]: if key in obj: return extract_recursive(obj[key], depth + 1) # Fallback to any string content for value in obj.values(): if isinstance(value, str) and len(value.strip()) > 10: return value.strip() return str(obj) # Handle objects with content attributes if hasattr(obj, "content"): content = obj.content if isinstance(content, str): return content.strip() return extract_recursive(content, depth + 1) # Handle other output objects if hasattr(obj, "output"): return extract_recursive(obj.output, depth + 1) # Handle list/tuple - extract first meaningful content if isinstance(obj, (list, tuple)) and obj: for item in obj: result = extract_recursive(item, depth + 1) if isinstance(result, str) and len(result.strip()) > 10: return result.strip() # If it's already a string, clean it up if isinstance(obj, str): content = obj.strip() # Remove object representations if any( content.startswith(pattern) for pattern in [ "RunOutput(", "TeamRunOutput(", "StepOutput(", "WorkflowResult(", "{'result':", '{"result":', "{'content':", '{"content":', ] ): # Try to extract content using regex (re imported at module level) patterns = [ (r"content='([^']*)'", 1), (r'content="([^"]*)"', 1), (r"'result':\s*'([^']*)'", 1), (r'"result":\s*"([^"]*)"', 1), (r"'([^']{20,})'", 1), (r'"([^"]{20,})"', 1), ] for pattern, group in patterns: match = re.search(pattern, content) if match: extracted = match.group(group).strip() if len(extracted) > 10: return extracted # Clean up object syntax cleaned = re.sub(r'[{}()"\']', " ", content) cleaned = re.sub( r"\b(RunOutput|TeamRunOutput|StepOutput|content|result|success|error)\b", " ", cleaned, ) cleaned = re.sub(r"\s+", " ", cleaned).strip() if len(cleaned) > 20: return cleaned return content # Fallback result = str(obj).strip() if len(result) > 20 and not any( result.startswith(pattern) for pattern in ["RunOutput(", "TeamRunOutput(", "StepOutput(", "<"] ): return result return "Multi-Thinking processing completed successfully" return extract_recursive(result) # For backward compatibility with the old AgnoWorkflowRouter name AgnoWorkflowRouter = MultiThinkingWorkflowRouter WorkflowResult = MultiThinkingWorkflowResult ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/infrastructure/persistent_memory.py: -------------------------------------------------------------------------------- ```python """Persistent memory management with SQLAlchemy and memory pruning.""" import os from datetime import datetime, timedelta from pathlib import Path from sqlalchemy import ( JSON, Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, String, Text, create_engine, desc, ) from sqlalchemy.orm import DeclarativeBase, Session, relationship, sessionmaker from sqlalchemy.pool import StaticPool from mcp_server_mas_sequential_thinking.config.constants import ( DatabaseConstants, DefaultSettings, ) from mcp_server_mas_sequential_thinking.core.models import ThoughtData from .logging_config import get_logger logger = get_logger(__name__) class Base(DeclarativeBase): """Base class for SQLAlchemy models with modern typing support.""" class SessionRecord(Base): """Database model for session storage.""" __tablename__ = "sessions" id = Column(String, primary_key=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) total_thoughts = Column(Integer, default=0) total_cost = Column(Float, default=0.0) provider = Column(String, default="deepseek") # Relationship to thoughts thoughts = relationship( "ThoughtRecord", back_populates="session", cascade="all, delete-orphan" ) # Index for performance __table_args__ = ( Index("ix_session_created", "created_at"), Index("ix_session_updated", "updated_at"), ) class ThoughtRecord(Base): """Database model for thought storage.""" __tablename__ = "thoughts" id = Column(Integer, primary_key=True) session_id = Column(String, ForeignKey("sessions.id"), nullable=False) thought_number = Column(Integer, nullable=False) thought = Column(Text, nullable=False) total_thoughts = Column(Integer, nullable=False) next_needed = Column(Boolean, nullable=False) branch_from = Column(Integer, nullable=True) branch_id = Column(String, nullable=True) # Processing metadata processing_strategy = Column(String, nullable=True) complexity_score = Column(Float, nullable=True) estimated_cost = Column(Float, nullable=True) actual_cost = Column(Float, nullable=True) token_usage = Column(Integer, nullable=True) processing_time = Column(Float, nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) processed_at = Column(DateTime, nullable=True) # Response data response = Column(Text, nullable=True) specialist_used = Column(JSON, nullable=True) # List of specialists used # Relationship session = relationship("SessionRecord", back_populates="thoughts") # Indexes for performance __table_args__ = ( Index("ix_thought_session", "session_id"), Index("ix_thought_number", "thought_number"), Index("ix_thought_created", "created_at"), Index("ix_thought_branch", "branch_from", "branch_id"), ) def to_thought_data(self) -> ThoughtData: """Convert database record to ThoughtData model.""" return ThoughtData( thought=str(self.thought), thoughtNumber=int(self.thought_number), totalThoughts=int(self.total_thoughts), nextThoughtNeeded=bool(self.next_needed), isRevision=False, # Assuming default value is intentional branchFromThought=self.branch_from, branchId=self.branch_id, needsMoreThoughts=True, # Assuming default value is intentional ) class BranchRecord(Base): """Database model for branch tracking.""" __tablename__ = "branches" id = Column(Integer, primary_key=True) session_id = Column(String, ForeignKey("sessions.id"), nullable=False) branch_id = Column(String, nullable=False) parent_thought = Column(Integer, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) thought_count = Column(Integer, default=0) __table_args__ = ( Index("ix_branch_session", "session_id"), Index("ix_branch_id", "branch_id"), ) class UsageMetrics(Base): """Database model for usage tracking and cost optimization.""" __tablename__ = "usage_metrics" id = Column(Integer, primary_key=True) date = Column(DateTime, default=datetime.utcnow) provider = Column(String, nullable=False) processing_strategy = Column(String, nullable=False) complexity_level = Column(String, nullable=False) # Metrics thought_count = Column(Integer, default=0) total_tokens = Column(Integer, default=0) total_cost = Column(Float, default=0.0) avg_processing_time = Column(Float, default=0.0) success_rate = Column(Float, default=1.0) # Performance tracking token_efficiency = Column(Float, default=0.0) # quality/token ratio cost_effectiveness = Column(Float, default=0.0) # quality/cost ratio __table_args__ = ( Index("ix_metrics_date", "date"), Index("ix_metrics_provider", "provider"), Index("ix_metrics_strategy", "processing_strategy"), ) class PersistentMemoryManager: """Manages persistent storage and memory pruning.""" def __init__(self, database_url: str | None = None) -> None: """Initialize persistent memory manager.""" # Default to local SQLite database if database_url is None: db_dir = Path.home() / ".sequential_thinking" / "data" db_dir.mkdir(parents=True, exist_ok=True) database_url = f"sqlite:///{db_dir}/memory.db" # Configure engine with connection pooling if database_url.startswith("sqlite"): # SQLite-specific configuration self.engine = create_engine( database_url, poolclass=StaticPool, connect_args={"check_same_thread": False}, echo=False, ) else: # PostgreSQL/other database configuration self.engine = create_engine( database_url, pool_pre_ping=True, pool_size=DatabaseConstants.CONNECTION_POOL_SIZE, max_overflow=DatabaseConstants.CONNECTION_POOL_OVERFLOW, ) # Create tables Base.metadata.create_all(self.engine) # Session factory self.SessionLocal = sessionmaker(bind=self.engine) logger.info(f"Persistent memory initialized with database: {database_url}") def create_session( self, session_id: str, provider: str = DefaultSettings.DEFAULT_PROVIDER ) -> None: """Create a new session record.""" with self.SessionLocal() as db: existing = ( db.query(SessionRecord).filter(SessionRecord.id == session_id).first() ) if not existing: session_record = SessionRecord(id=session_id, provider=provider) db.add(session_record) db.commit() logger.info(f"Created new session: {session_id}") def store_thought( self, session_id: str, thought_data: ThoughtData, response: str | None = None, processing_metadata: dict | None = None, ) -> int: """Store a thought and return its database ID.""" with self.SessionLocal() as db: session_record = self._ensure_session_exists(db, session_id) thought_record = self._create_thought_record( session_id, thought_data, response, processing_metadata ) db.add(thought_record) self._update_session_stats(session_record, processing_metadata) self._handle_branching(db, session_id, thought_data) db.commit() return int(thought_record.id) def _ensure_session_exists(self, db: Session, session_id: str) -> SessionRecord: """Ensure session exists in database and return it.""" session_record = ( db.query(SessionRecord).filter(SessionRecord.id == session_id).first() ) if not session_record: self.create_session(session_id) session_record = ( db.query(SessionRecord).filter(SessionRecord.id == session_id).first() ) return session_record def _create_thought_record( self, session_id: str, thought_data: ThoughtData, response: str | None, processing_metadata: dict | None, ) -> ThoughtRecord: """Create a thought record with metadata.""" thought_record = ThoughtRecord( session_id=session_id, thought_number=thought_data.thoughtNumber, thought=thought_data.thought, total_thoughts=thought_data.totalThoughts, next_needed=thought_data.nextThoughtNeeded, branch_from=thought_data.branchFromThought, branch_id=thought_data.branchId, response=response, ) if processing_metadata: self._apply_processing_metadata(thought_record, processing_metadata) return thought_record def _apply_processing_metadata( self, thought_record: ThoughtRecord, metadata: dict ) -> None: """Apply processing metadata to thought record.""" if strategy := metadata.get("strategy"): thought_record.processing_strategy = str(strategy) # type: ignore[assignment] if complexity_score := metadata.get("complexity_score"): thought_record.complexity_score = float(complexity_score) # type: ignore[assignment] if estimated_cost := metadata.get("estimated_cost"): thought_record.estimated_cost = float(estimated_cost) # type: ignore[assignment] if actual_cost := metadata.get("actual_cost"): thought_record.actual_cost = float(actual_cost) # type: ignore[assignment] if token_usage := metadata.get("token_usage"): thought_record.token_usage = int(token_usage) # type: ignore[assignment] if processing_time := metadata.get("processing_time"): thought_record.processing_time = float(processing_time) # type: ignore[assignment] thought_record.specialist_used = metadata.get("specialists", []) thought_record.processed_at = datetime.utcnow() # type: ignore[assignment] def _update_session_stats( self, session_record: SessionRecord, processing_metadata: dict | None ) -> None: """Update session statistics.""" if session_record: session_record.totalThoughts += 1 # type: ignore[assignment] session_record.updated_at = datetime.utcnow() # type: ignore[assignment] if processing_metadata and processing_metadata.get("actual_cost"): session_record.total_cost += float(processing_metadata["actual_cost"]) # type: ignore[assignment] def _handle_branching( self, db: Session, session_id: str, thought_data: ThoughtData ) -> None: """Handle branch record creation and updates.""" if not thought_data.branchId: return branch_record = ( db.query(BranchRecord) .filter( BranchRecord.session_id == session_id, BranchRecord.branchId == thought_data.branchId, ) .first() ) if not branch_record: branch_record = BranchRecord( session_id=session_id, branch_id=thought_data.branchId, parent_thought=thought_data.branchFromThought, ) db.add(branch_record) branch_record.thought_count += 1 # type: ignore[assignment] def get_session_thoughts( self, session_id: str, limit: int | None = None ) -> list[ThoughtRecord]: """Retrieve thoughts for a session.""" with self.SessionLocal() as db: query = ( db.query(ThoughtRecord) .filter(ThoughtRecord.session_id == session_id) .order_by(ThoughtRecord.thoughtNumber) ) if limit: query = query.limit(limit) return query.all() def get_thought_by_number( self, session_id: str, thought_number: int ) -> ThoughtRecord | None: """Get a specific thought by number.""" with self.SessionLocal() as db: return ( db.query(ThoughtRecord) .filter( ThoughtRecord.session_id == session_id, ThoughtRecord.thoughtNumber == thought_number, ) .first() ) def get_branch_thoughts( self, session_id: str, branch_id: str ) -> list[ThoughtRecord]: """Get all thoughts in a specific branch.""" with self.SessionLocal() as db: return ( db.query(ThoughtRecord) .filter( ThoughtRecord.session_id == session_id, ThoughtRecord.branchId == branch_id, ) .order_by(ThoughtRecord.thoughtNumber) .all() ) def prune_old_sessions( self, older_than_days: int = 30, keep_recent: int = 100 ) -> int: """Prune old sessions to manage storage space.""" cutoff_date = datetime.utcnow() - timedelta(days=older_than_days) with self.SessionLocal() as db: # Get sessions older than cutoff, excluding most recent ones old_sessions = ( db.query(SessionRecord) .filter(SessionRecord.updated_at < cutoff_date) .order_by(desc(SessionRecord.updated_at)) .offset(keep_recent) ) deleted_count = 0 for session in old_sessions: db.delete(session) # Cascade will handle thoughts and branches deleted_count += 1 if deleted_count > 0: db.commit() logger.info(f"Pruned {deleted_count} old sessions") return deleted_count def get_usage_stats(self, days_back: int = 7) -> dict: """Get usage statistics for cost optimization.""" cutoff_date = datetime.utcnow() - timedelta(days=days_back) with self.SessionLocal() as db: # Session stats session_count = ( db.query(SessionRecord) .filter(SessionRecord.created_at >= cutoff_date) .count() ) # Thought stats thought_stats = ( db.query(ThoughtRecord) .filter(ThoughtRecord.created_at >= cutoff_date) .all() ) total_thoughts = len(thought_stats) # Explicit type casting to resolve SQLAlchemy Column type issues total_cost: float = float( sum(float(t.actual_cost or 0) for t in thought_stats) ) total_tokens: int = int(sum(int(t.token_usage or 0) for t in thought_stats)) avg_processing_time = sum( t.processing_time or 0 for t in thought_stats ) / max(total_thoughts, 1) # Strategy breakdown strategy_stats = {} for thought in thought_stats: strategy = thought.processing_strategy or "unknown" if strategy not in strategy_stats: strategy_stats[strategy] = {"count": 0, "cost": 0, "tokens": 0} strategy_stats[strategy]["count"] += 1 strategy_stats[strategy]["cost"] += thought.actual_cost or 0 strategy_stats[strategy]["tokens"] += thought.token_usage or 0 return { "period_days": days_back, "session_count": session_count, "total_thoughts": total_thoughts, "total_cost": total_cost, "total_tokens": total_tokens, "avg_cost_per_thought": total_cost / max(total_thoughts, 1), "avg_tokens_per_thought": total_tokens / max(total_thoughts, 1), "avg_processing_time": avg_processing_time, "strategy_breakdown": strategy_stats, } def record_usage_metrics( self, provider: str, processing_strategy: str, complexity_level: str, thought_count: int = 1, tokens: int = 0, cost: float = 0.0, processing_time: float = 0.0, success: bool = True, ) -> None: """Record usage metrics for cost optimization.""" with self.SessionLocal() as db: # Check if we have a record for today today = datetime.utcnow().date() existing = ( db.query(UsageMetrics) .filter( UsageMetrics.date >= datetime.combine(today, datetime.min.time()), UsageMetrics.provider == provider, UsageMetrics.processing_strategy == processing_strategy, UsageMetrics.complexity_level == complexity_level, ) .first() ) if existing: # Update existing record existing.thought_count += thought_count # type: ignore[assignment] existing.total_tokens += tokens # type: ignore[assignment] existing.total_cost += cost # type: ignore[assignment] existing.avg_processing_time = ( # type: ignore[assignment] float(existing.avg_processing_time) * (existing.thought_count - thought_count) + processing_time * thought_count ) / existing.thought_count if not success: existing.success_rate = ( # type: ignore[assignment] float(existing.success_rate) * (existing.thought_count - thought_count) ) / existing.thought_count else: # Create new record metrics = UsageMetrics( provider=provider, processing_strategy=processing_strategy, complexity_level=complexity_level, thought_count=thought_count, total_tokens=tokens, total_cost=cost, avg_processing_time=processing_time, success_rate=1.0 if success else 0.0, ) db.add(metrics) db.commit() def optimize_database(self) -> None: """Run database optimization tasks.""" with self.SessionLocal() as db: from sqlalchemy import text if self.engine.dialect.name == "sqlite": db.execute(text("VACUUM")) db.execute(text("ANALYZE")) else: db.execute(text("ANALYZE")) db.commit() logger.info("Database optimization completed") def close(self) -> None: """Close database connections.""" self.engine.dispose() # Convenience functions def create_persistent_memory( database_url: str | None = None, ) -> PersistentMemoryManager: """Create a persistent memory manager instance.""" return PersistentMemoryManager(database_url) def get_database_url_from_env() -> str: """Get database URL from environment variables.""" if url := os.getenv("DATABASE_URL"): return url # Default to local SQLite db_dir = Path.home() / ".sequential_thinking" / "data" db_dir.mkdir(parents=True, exist_ok=True) return f"sqlite:///{db_dir}/memory.db" ```