#
tokens: 47592/50000 37/39 files (page 1/2)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 2. Use http://codebase.md/fradser/mcp-server-mas-sequential-thinking?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .env.example
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CLAUDE.md
├── Dockerfile
├── pyproject.toml
├── README.md
├── README.zh-CN.md
├── smithery.yaml
├── src
│   └── mcp_server_mas_sequential_thinking
│       ├── __init__.py
│       ├── config
│       │   ├── __init__.py
│       │   ├── constants.py
│       │   └── modernized_config.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── models.py
│       │   ├── session.py
│       │   └── types.py
│       ├── infrastructure
│       │   ├── __init__.py
│       │   ├── logging_config.py
│       │   └── persistent_memory.py
│       ├── main.py
│       ├── processors
│       │   ├── __init__.py
│       │   ├── multi_thinking_core.py
│       │   └── multi_thinking_processor.py
│       ├── routing
│       │   ├── __init__.py
│       │   ├── agno_workflow_router.py
│       │   ├── ai_complexity_analyzer.py
│       │   ├── complexity_types.py
│       │   └── multi_thinking_router.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_builder.py
│       │   ├── processing_orchestrator.py
│       │   ├── response_formatter.py
│       │   ├── response_processor.py
│       │   ├── retry_handler.py
│       │   ├── server_core.py
│       │   ├── thought_processor_refactored.py
│       │   └── workflow_executor.py
│       └── utils
│           ├── __init__.py
│           └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
.env

# Testing
.hypothesis/

```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# --- LLM Configuration ---
# Select the LLM provider: "deepseek" (default), "groq", "openrouter", "github", or "ollama"
LLM_PROVIDER="deepseek"

# Provide the API key for the chosen provider:
# GROQ_API_KEY="your_groq_api_key"
DEEPSEEK_API_KEY="your_deepseek_api_key"
# OPENROUTER_API_KEY="your_openrouter_api_key"
# GITHUB_TOKEN="ghp_your_github_personal_access_token"
# Note: Ollama requires no API key but needs local installation
# Note: GitHub Models requires a GitHub Personal Access Token with appropriate scopes

# Optional: Base URL override (e.g., for custom endpoints)
LLM_BASE_URL="your_base_url_if_needed"

# Optional: Specify different models for Enhanced (Complex Synthesis) and Standard (Individual Processing)
# Defaults are set within the code based on the provider if these are not set.
# Example for Groq:
# GROQ_ENHANCED_MODEL_ID="openai/gpt-oss-120b"  # For complex synthesis
# GROQ_STANDARD_MODEL_ID="openai/gpt-oss-20b"  # For individual processing
# Example for DeepSeek:
# DEEPSEEK_ENHANCED_MODEL_ID="deepseek-chat"  # For complex synthesis
# DEEPSEEK_STANDARD_MODEL_ID="deepseek-chat"  # For individual processing
# Example for GitHub Models:
# GITHUB_ENHANCED_MODEL_ID="openai/gpt-5"  # For complex synthesis
# GITHUB_STANDARD_MODEL_ID="openai/gpt-5-min"  # For individual processing
# Example for OpenRouter:
# OPENROUTER_ENHANCED_MODEL_ID="deepseek/deepseek-chat-v3-0324"  # For synthesis
# OPENROUTER_STANDARD_MODEL_ID="deepseek/deepseek-r1"  # For processing
# Example for Ollama:
# OLLAMA_ENHANCED_MODEL_ID="devstral:24b"  # For complex synthesis
# OLLAMA_STANDARD_MODEL_ID="devstral:24b"  # For individual processing

# --- Enhanced Agno 1.8+ Features ---
# Enable enhanced agents with advanced reasoning, memory, and structured outputs
USE_ENHANCED_AGENTS="true"

# Team mode: "standard", "enhanced", "hybrid", or "multi_thinking"
# standard: Traditional agent setup (backward compatible)
# enhanced: Use new Agno 1.8+ features (memory, reasoning, structured outputs)
# hybrid: Mix of standard and enhanced agents for optimal performance
# multi_thinking: Use Multi-Thinking methodology for balanced thinking (recommended)
TEAM_MODE="standard"

# --- External Tools ---
# Required ONLY if web research capabilities are needed for thinking agents
EXA_API_KEY="your_exa_api_key"

# --- Adaptive Routing & Cost Optimization ---
# Enable adaptive routing (automatically selects single vs multi-agent based on complexity)
ENABLE_ADAPTIVE_ROUTING="true"

# Multi-Thinking intelligent routing
ENABLE_MULTI_THINKING="true"  # Enable Multi-Thinking methodology for enhanced thinking
MULTI_THINKING_COMPLEXITY_THRESHOLD="5.0"  # Minimum complexity to trigger Multi-Thinking processing

# Cost optimization settings
DAILY_BUDGET_LIMIT=""  # e.g., "5.0" for $5 daily limit
MONTHLY_BUDGET_LIMIT=""  # e.g., "50.0" for $50 monthly limit
PER_THOUGHT_BUDGET_LIMIT=""  # e.g., "0.10" for $0.10 per thought limit
QUALITY_THRESHOLD="0.7"  # Minimum quality score (0.0-1.0)

# Response optimization settings
RESPONSE_STYLE="practical"  # "practical", "academic", or "balanced"
MAX_RESPONSE_LENGTH="800"  # Maximum response length in words
ENFORCE_SIMPLICITY="true"  # Remove excessive academic complexity

# Provider cost overrides (cost per 1K tokens)
# DEEPSEEK_COST_PER_1K_TOKENS="0.0002"
# GROQ_COST_PER_1K_TOKENS="0.0"
# GITHUB_COST_PER_1K_TOKENS="0.0005"
# OPENROUTER_COST_PER_1K_TOKENS="0.001"
# OLLAMA_COST_PER_1K_TOKENS="0.0"

# --- Persistent Memory ---
# Database URL for persistent storage (defaults to local SQLite)
DATABASE_URL=""  # e.g., "postgresql://user:pass@localhost/dbname" or leave empty for SQLite

# Memory management
MEMORY_PRUNING_DAYS="30"  # Prune sessions older than X days
MEMORY_KEEP_RECENT="100"  # Always keep X most recent sessions

# Note: Logs are stored in ~/.mas_sequential_thinking/logs/ directory
# The log file is named mas_sequential_thinking.log with rotation

# --- Logging and Performance ---
# Core logging configuration
LOG_LEVEL="INFO"  # "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
LOG_FORMAT="text"  # "text" (readable) or "json" (structured)
LOG_TARGETS="file,console"  # Comma-separated: "file", "console"
LOG_FILE_MAX_SIZE="10MB"  # Maximum log file size (supports KB, MB, GB)
LOG_FILE_BACKUP_COUNT="5"  # Number of backup log files to keep
LOG_SAMPLING_RATE="1.0"  # Log sampling rate (0.0-1.0, 1.0 = all logs)

# Smart logging configuration (reduces log verbosity while maintaining insights)
SMART_LOGGING="true"  # Enable intelligent logging with adaptive verbosity
SMART_LOG_LEVEL="performance"  # "critical", "performance", "routing", "debug"
LOG_PERFORMANCE_ISSUES="true"  # Always log slow/expensive processing
LOG_RESPONSE_QUALITY="true"  # Log overly complex or academic responses

# Performance and error handling
MAX_RETRIES="3"
TIMEOUT="30.0"
PERFORMANCE_MONITORING="true"  # Enable real-time performance monitoring
PERFORMANCE_BASELINE_TIME="30.0"  # Baseline time per thought in seconds
PERFORMANCE_BASELINE_EFFICIENCY="0.8"  # Target efficiency score
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Sequential Thinking Multi-Agent System (MAS) ![](https://img.shields.io/badge/A%20FRAD%20PRODUCT-WIP-yellow)

[![smithery badge](https://smithery.ai/badge/@FradSer/mcp-server-mas-sequential-thinking)](https://smithery.ai/server/@FradSer/mcp-server-mas-sequential-thinking) [![Twitter Follow](https://img.shields.io/twitter/follow/FradSer?style=social)](https://twitter.com/FradSer) [![Python Version](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/) [![Framework](https://img.shields.io/badge/Framework-Agno-orange.svg)](https://github.com/cognitivecomputations/agno)

English | [简体中文](README.zh-CN.md)

This project implements an advanced sequential thinking process using a **Multi-Agent System (MAS)** built with the **Agno** framework and served via **MCP**. It represents a significant evolution from simpler state-tracking approaches by leveraging coordinated, specialized agents for deeper analysis and problem decomposition.

[![MseeP.ai Security Assessment Badge](https://mseep.net/pr/fradser-mcp-server-mas-sequential-thinking-badge.png)](https://mseep.ai/app/fradser-mcp-server-mas-sequential-thinking)

## What is This?

This is an **MCP server** - not a standalone application. It runs as a background service that extends your LLM client (like Claude Desktop) with sophisticated sequential thinking capabilities. The server provides a `sequentialthinking` tool that processes thoughts through multiple specialized AI agents, each examining the problem from a different cognitive angle.

## Core Architecture: Multi-Dimensional Thinking Agents

The system employs **6 specialized thinking agents**, each focused on a distinct cognitive perspective:

### 1. **Factual Agent**
- **Focus**: Objective facts and verified data
- **Approach**: Analytical, evidence-based reasoning
- **Capabilities**:
  - Web research for current facts (via ExaTools)
  - Data verification and source citation
  - Information gap identification
- **Time allocation**: 120 seconds for thorough analysis

### 2. **Emotional Agent**
- **Focus**: Intuition and emotional intelligence
- **Approach**: Gut reactions and feelings
- **Capabilities**:
  - Quick intuitive responses (30-second snapshots)
  - Visceral reactions without justification
  - Emotional pattern recognition
- **Time allocation**: 30 seconds (quick reaction mode)

### 3. **Critical Agent**
- **Focus**: Risk assessment and problem identification
- **Approach**: Logical scrutiny and devil's advocate
- **Capabilities**:
  - Research counterexamples and failures (via ExaTools)
  - Identify logical flaws and risks
  - Challenge assumptions constructively
- **Time allocation**: 120 seconds for deep analysis

### 4. **Optimistic Agent**
- **Focus**: Benefits, opportunities, and value
- **Approach**: Positive exploration with realistic grounding
- **Capabilities**:
  - Research success stories (via ExaTools)
  - Identify feasible opportunities
  - Explore best-case scenarios logically
- **Time allocation**: 120 seconds for balanced optimism

### 5. **Creative Agent**
- **Focus**: Innovation and alternative solutions
- **Approach**: Lateral thinking and idea generation
- **Capabilities**:
  - Cross-industry innovation research (via ExaTools)
  - Divergent thinking techniques
  - Multiple solution generation
- **Time allocation**: 240 seconds (creativity needs time)

### 6. **Synthesis Agent**
- **Focus**: Integration and metacognitive orchestration
- **Approach**: Holistic synthesis and final answer generation
- **Capabilities**:
  - Integrate all perspectives into coherent response
  - Answer the original question directly
  - Provide actionable, user-friendly insights
- **Time allocation**: 60 seconds for synthesis
- **Note**: Uses enhanced model, does NOT include ExaTools (focuses on integration)

## AI-Powered Intelligent Routing

The system uses **AI-driven complexity analysis** to determine the optimal thinking sequence:

### Processing Strategies:
1. **Single Agent** (Simple questions)
   - Direct factual or emotional response
   - Fastest processing for straightforward queries

2. **Double Agent** (Moderate complexity)
   - Two-step sequences (e.g., Optimistic → Critical)
   - Balanced perspectives for evaluation tasks

3. **Triple Agent** (Core thinking)
   - Factual → Creative → Synthesis
   - Philosophical and analytical problems

4. **Full Sequence** (Complex problems)
   - All 6 agents orchestrated together
   - Comprehensive multi-perspective analysis

The AI analyzer evaluates:
- Problem complexity and semantic depth
- Primary problem type (factual, emotional, creative, philosophical, etc.)
- Required thinking modes for optimal solution
- Appropriate model selection (Enhanced vs Standard)

### AI Routing Flow Diagram

```mermaid
flowchart TD
    A[Input Thought] --> B[AI Complexity Analyzer]

    B --> C{Problem Analysis}
    C --> C1[Complexity Score<br/>0-100]
    C --> C2[Problem Type<br/>FACTUAL/EMOTIONAL/<br/>CREATIVE/PHILOSOPHICAL]
    C --> C3[Required Thinking Modes]

    C1 --> D{Routing Decision}
    C2 --> D
    C3 --> D

    D -->|Score: 0-25<br/>Simple| E1[Single Agent Strategy]
    D -->|Score: 26-50<br/>Moderate| E2[Double Agent Strategy]
    D -->|Score: 51-75<br/>Complex| E3[Triple Agent Strategy]
    D -->|Score: 76-100<br/>Highly Complex| E4[Full Sequence Strategy]

    %% Single Agent Flow
    E1 --> F1[Factual Agent<br/>120s + ExaTools]
    F1 --> G1[Direct Response]

    %% Double Agent Flow (Full Parallel)
    E2 --> DA1[Both Agents Run in Parallel]
    DA1 --> DA2["Agent 1 e.g. Optimistic<br/>120s + ExaTools"]
    DA1 --> DA3["Agent 2 e.g. Critical<br/>120s + ExaTools"]
    DA2 --> G2[Programmatic Synthesis<br/>Combines both parallel results]
    DA3 --> G2

    %% Triple Agent Flow (Full Parallel)
    E3 --> TA1[All 3 Agents Run in Parallel]
    TA1 --> TA2[Factual Agent<br/>120s + ExaTools]
    TA1 --> TA3[Creative Agent<br/>240s + ExaTools]
    TA1 --> TA4[Critical Agent<br/>120s + ExaTools]
    TA2 --> G3[Programmatic Synthesis<br/>Integrates all 3 results]
    TA3 --> G3
    TA4 --> G3

    %% Full Sequence Flow (3-Step Process)
    E4 --> FS1[Step 1: Initial Synthesis<br/>60s Enhanced Model<br/>Initial orchestration]
    FS1 --> FS2[Step 2: Parallel Execution<br/>5 Agents Run Simultaneously]

    FS2 --> FS2A[Factual Agent<br/>120s + ExaTools]
    FS2 --> FS2B[Emotional Agent<br/>30s Quick Response]
    FS2 --> FS2C[Optimistic Agent<br/>120s + ExaTools]
    FS2 --> FS2D[Critical Agent<br/>120s + ExaTools]
    FS2 --> FS2E[Creative Agent<br/>240s + ExaTools]

    FS2A --> FS3[Step 3: Final Synthesis<br/>60s Enhanced Model<br/>Integrates all parallel results]
    FS2B --> FS3
    FS2C --> FS3
    FS2D --> FS3
    FS2E --> FS3

    FS3 --> G4[Final Synthesis Output<br/>Comprehensive integrated result]

    G1 --> H[Next Iteration or<br/>Final Answer]
    G2 --> H
    G3 --> H
    G4 --> H

    style A fill:#e1f5fe
    style B fill:#f3e5f5
    style C fill:#fff3e0
    style D fill:#e8f5e8
    style TA1 fill:#ffecb3
    style FS2 fill:#ffecb3
    style G1 fill:#fce4ec
    style G2 fill:#fce4ec
    style G3 fill:#fce4ec
    style G4 fill:#fce4ec
    style H fill:#f1f8e9
```

**Key Insights:**
- **Parallel Execution**: Non-synthesis agents run simultaneously for maximum efficiency
- **Synthesis Integration**: Synthesis agents process parallel results sequentially
- **Two Processing Types**:
  - **Synthesis Agent**: Real AI agent using Enhanced Model for integration
  - **Programmatic Synthesis**: Code-based combination when no Synthesis Agent
- **Performance**: Parallel processing optimizes both speed and quality

## Research Capabilities (ExaTools Integration)

**4 out of 6 agents** are equipped with web research capabilities via ExaTools:

- **Factual Agent**: Search for current facts, statistics, verified data
- **Critical Agent**: Find counterexamples, failed cases, regulatory issues
- **Optimistic Agent**: Research success stories, positive case studies
- **Creative Agent**: Discover innovations across different industries
- **Emotional & Synthesis Agents**: No ExaTools (focused on internal processing)

Research is **optional** - requires `EXA_API_KEY` environment variable. The system works perfectly without it, using pure reasoning capabilities.

## Model Intelligence

### Dual Model Strategy:
- **Enhanced Model**: Used for Synthesis agent (complex integration tasks)
- **Standard Model**: Used for individual thinking agents
- **AI Selection**: System automatically chooses the right model based on task complexity

### Supported Providers:
- **DeepSeek** (default) - High performance, cost-effective
- **Groq** - Ultra-fast inference
- **OpenRouter** - Access to multiple models
- **GitHub Models** - OpenAI models via GitHub API
- **Anthropic** - Claude models with prompt caching
- **Ollama** - Local model execution

## Key Differences from Original Version (TypeScript)

This Python/Agno implementation marks a fundamental shift from the original TypeScript version:

| Feature/Aspect      | Python/Agno Version (Current)                                        | TypeScript Version (Original)                        |
| :------------------ | :------------------------------------------------------------------- | :--------------------------------------------------- |
| **Architecture**    | **Multi-Agent System (MAS)**; Active processing by a team of agents. | **Single Class State Tracker**; Simple logging/storing. |
| **Intelligence**    | **Distributed Agent Logic**; Embedded in specialized agents & Coordinator. | **External LLM Only**; No internal intelligence.     |
| **Processing**      | **Active Analysis & Synthesis**; Agents *act* on the thought.      | **Passive Logging**; Merely recorded the thought.    |
| **Frameworks**      | **Agno (MAS) + FastMCP (Server)**; Uses dedicated MAS library.     | **MCP SDK only**.                                    |
| **Coordination**    | **Explicit Team Coordination Logic** (`Team` in `coordinate` mode).  | **None**; No coordination concept.                   |
| **Validation**      | **Pydantic Schema Validation**; Robust data validation.            | **Basic Type Checks**; Less reliable.              |
| **External Tools**  | **Integrated (Exa via Researcher)**; Can perform research tasks.   | **None**.                                            |
| **Logging**         | **Structured Python Logging (File + Console)**; Configurable.      | **Console Logging with Chalk**; Basic.             |
| **Language & Ecosystem** | **Python**; Leverages Python AI/ML ecosystem.                    | **TypeScript/Node.js**.                              |

In essence, the system evolved from a passive thought *recorder* to an active thought *processor* powered by a collaborative team of AI agents.

## How it Works (Multi-Dimensional Processing)

1.  **Initiation:** An external LLM uses the `sequentialthinking` tool to define the problem and initiate the process.
2.  **Tool Call:** The LLM calls the `sequentialthinking` tool with the current thought, structured according to the `ThoughtData` model.
3.  **AI Complexity Analysis:** The system uses AI-powered analysis to determine the optimal thinking sequence based on problem complexity and type.
4.  **Agent Routing:** Based on the analysis, the system routes the thought to the appropriate thinking agents (single, double, triple, or full sequence).
5.  **Parallel Processing:** Multiple thinking agents process the thought simultaneously from their specialized perspectives:
   - Factual agents gather objective data (with optional web research)
   - Critical agents identify risks and problems
   - Optimistic agents explore opportunities and benefits
   - Creative agents generate innovative solutions
   - Emotional agents provide intuitive insights
6.  **Research Integration:** Agents equipped with ExaTools conduct targeted web research to enhance their analysis.
7.  **Synthesis & Integration:** The Synthesis agent integrates all perspectives into a coherent, actionable response using enhanced models.
8.  **Response Generation:** The system returns a comprehensive analysis with guidance for next steps.
9.  **Iteration:** The calling LLM uses the synthesized response to formulate the next thinking step or conclude the process.

## Token Consumption Warning

**High Token Usage:** Due to the Multi-Agent System architecture, this tool consumes significantly **more tokens** than single-agent alternatives or the previous TypeScript version. Each `sequentialthinking` call invokes multiple specialized agents simultaneously, leading to substantially higher token usage (potentially 5-10x more than simple approaches).

This parallel processing leads to substantially higher token usage (potentially 5-10x more) compared to simpler sequential approaches, but provides correspondingly deeper and more comprehensive analysis.

## MCP Tool: `sequentialthinking`

The server exposes a single MCP tool that processes sequential thoughts:

### Parameters:
```typescript
{
  thought: string,              // Current thinking step content
  thoughtNumber: number,         // Sequence number (≥1)
  totalThoughts: number,         // Estimated total steps
  nextThoughtNeeded: boolean,    // Is another step required?
  isRevision: boolean,           // Revising previous thought?
  branchFromThought?: number,    // Branch point (for exploration)
  branchId?: string,             // Branch identifier
  needsMoreThoughts: boolean     // Need to extend sequence?
}
```

### Response:
Returns synthesized analysis from the multi-agent system with:
- Processed thought analysis
- Guidance for next steps
- Branch and revision tracking
- Status and metadata

## Installation

### Prerequisites

- Python 3.10+
- LLM API access (choose one):
    - **DeepSeek**: `DEEPSEEK_API_KEY` (default, recommended)
    - **Groq**: `GROQ_API_KEY`
    - **OpenRouter**: `OPENROUTER_API_KEY`
    - **GitHub Models**: `GITHUB_TOKEN`
    - **Anthropic**: `ANTHROPIC_API_KEY`
    - **Ollama**: Local installation (no API key)
- **Optional**: `EXA_API_KEY` for web research capabilities
- `uv` package manager (recommended) or `pip`

### Quick Start

#### 1. Install via Smithery (Recommended)

```bash
npx -y @smithery/cli install @FradSer/mcp-server-mas-sequential-thinking --client claude
```

#### 2. Manual Installation

```bash
# Clone the repository
git clone https://github.com/FradSer/mcp-server-mas-sequential-thinking.git
cd mcp-server-mas-sequential-thinking

# Install with uv (recommended)
uv pip install .

# Or with pip
pip install .
```

### Configuration

#### For MCP Clients (Claude Desktop, etc.)

Add to your MCP client configuration:

```json
{
  "mcpServers": {
    "sequential-thinking": {
      "command": "mcp-server-mas-sequential-thinking",
      "env": {
        "LLM_PROVIDER": "deepseek",
        "DEEPSEEK_API_KEY": "your_api_key",
        "EXA_API_KEY": "your_exa_key_optional"
      }
    }
  }
}
```

#### Environment Variables

Create a `.env` file or set these variables:

```bash
# LLM Provider (required)
LLM_PROVIDER="deepseek"  # deepseek, groq, openrouter, github, anthropic, ollama
DEEPSEEK_API_KEY="sk-..."

# Optional: Enhanced/Standard Model Selection
# DEEPSEEK_ENHANCED_MODEL_ID="deepseek-chat"  # For synthesis
# DEEPSEEK_STANDARD_MODEL_ID="deepseek-chat"  # For other agents

# Optional: Web Research (enables ExaTools)
# EXA_API_KEY="your_exa_api_key"

# Optional: Custom endpoint
# LLM_BASE_URL="https://custom-endpoint.com"
```

### Model Configuration Examples

```bash
# Groq with different models
GROQ_ENHANCED_MODEL_ID="openai/gpt-oss-120b"
GROQ_STANDARD_MODEL_ID="openai/gpt-oss-20b"

# Anthropic with Claude models
ANTHROPIC_ENHANCED_MODEL_ID="claude-3-5-sonnet-20241022"
ANTHROPIC_STANDARD_MODEL_ID="claude-3-5-haiku-20241022"

# GitHub Models
GITHUB_ENHANCED_MODEL_ID="gpt-4o"
GITHUB_STANDARD_MODEL_ID="gpt-4o-mini"
```

## Usage

### As MCP Server

Once installed and configured in your MCP client:

1. The `sequentialthinking` tool becomes available
2. Your LLM can use it to process complex thoughts
3. The system automatically routes to appropriate thinking agents
4. Results are synthesized and returned to your LLM

### Direct Execution

Run the server manually for testing:

```bash
# Using installed script
mcp-server-mas-sequential-thinking

# Using uv
uv run mcp-server-mas-sequential-thinking

# Using Python
python src/mcp_server_mas_sequential_thinking/main.py
```

## Development

### Setup

```bash
# Clone repository
git clone https://github.com/FradSer/mcp-server-mas-sequential-thinking.git
cd mcp-server-mas-sequential-thinking

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install with dev dependencies
uv pip install -e ".[dev]"
```

### Code Quality

```bash
# Format and lint
uv run ruff check . --fix
uv run ruff format .
uv run mypy .

# Run tests (when available)
uv run pytest
```

### Testing with MCP Inspector

```bash
npx @modelcontextprotocol/inspector uv run mcp-server-mas-sequential-thinking
```

Open http://127.0.0.1:6274/ and test the `sequentialthinking` tool.

## System Characteristics

### Strengths:
- **Multi-perspective analysis**: 6 different cognitive approaches
- **AI-powered routing**: Intelligent complexity analysis
- **Research capabilities**: 4 agents with web search (optional)
- **Flexible processing**: Single to full sequence strategies
- **Model optimization**: Enhanced/Standard model selection
- **Provider agnostic**: Works with multiple LLM providers

### Considerations:
- **Token usage**: Multi-agent processing uses more tokens than single-agent
- **Processing time**: Complex sequences take longer but provide deeper insights
- **API costs**: Research capabilities require separate Exa API subscription
- **Model selection**: Enhanced models cost more but provide better synthesis

## Project Structure

```
mcp-server-mas-sequential-thinking/
├── src/mcp_server_mas_sequential_thinking/
│   ├── main.py                          # MCP server entry point
│   ├── processors/
│   │   ├── multi_thinking_core.py       # 6 thinking agents definition
│   │   └── multi_thinking_processor.py  # Sequential processing logic
│   ├── routing/
│   │   ├── ai_complexity_analyzer.py    # AI-powered analysis
│   │   └── multi_thinking_router.py     # Intelligent routing
│   ├── services/
│   │   ├── thought_processor_refactored.py
│   │   ├── workflow_executor.py
│   │   └── context_builder.py
│   └── config/
│       ├── modernized_config.py         # Provider strategies
│       └── constants.py                 # System constants
├── pyproject.toml                       # Project configuration
└── README.md                            # This file
```

## Changelog

See [CHANGELOG.md](CHANGELOG.md) for version history.

## Contributing

Contributions are welcome! Please ensure:

1. Code follows project style (ruff, mypy)
2. Commit messages use conventional commits format
3. All tests pass before submitting PR
4. Documentation is updated as needed

## License

This project is licensed under the MIT License - see the LICENSE file for details.

## Acknowledgments

- Built with [Agno](https://github.com/agno-agi/agno) v2.0+ framework
- Model Context Protocol by [Anthropic](https://www.anthropic.com/)
- Research capabilities powered by [Exa](https://exa.ai/) (optional)
- Multi-dimensional thinking inspired by Edward de Bono's work

## Support

- GitHub Issues: [Report bugs or request features](https://github.com/FradSer/mcp-server-mas-sequential-thinking/issues)
- Documentation: Check CLAUDE.md for detailed implementation notes
- MCP Protocol: [Official MCP Documentation](https://modelcontextprotocol.io/)

---

**Note**: This is an MCP server, designed to work with MCP-compatible clients like Claude Desktop. It is not a standalone chat application.
```

--------------------------------------------------------------------------------
/CLAUDE.md:
--------------------------------------------------------------------------------

```markdown
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Essential Commands

```bash
# Setup & Installation
uv pip install -e ".[dev]"                              # Install all dependencies
uv run python -c "import agno; print('Agno imported successfully')"  # Verify setup

# Development Workflow
uv run mcp-server-mas-sequential-thinking               # Run server
uv run ruff check . --fix && uv run ruff format . && uv run mypy .  # Code quality
uv run pytest --cov=. --cov-report=html                # Test with coverage (no tests currently)

# Monitoring & Debugging
tail -f ~/.sequential_thinking/logs/sequential_thinking.log  # Live logs
grep "ERROR\|WARNING" ~/.sequential_thinking/logs/sequential_thinking.log  # Error search
```

### Additional Commands
- **Upgrade agno**: `uv pip install --upgrade agno`
- **Alternative server runs**: `uvx mcp-server-mas-sequential-thinking` or `uv run python src/mcp_server_mas_sequential_thinking/main.py`
- **MCP Inspector**: `npx @modelcontextprotocol/inspector uv run python src/mcp_server_mas_sequential_thinking/main.py`

## Project Overview

**Pure Multi-Thinking Implementation** built with **Agno v2.0** framework and served via MCP. Features **AI-powered intelligent routing** with streamlined architecture (src layout, Python 3.10+). The system processes thoughts through multi-directional thinking methodology with AI-driven complexity analysis and optimized model selection.

### Core Architecture

**Entry Point:** `src/mcp_server_mas_sequential_thinking/main.py`
- FastMCP application with `sequentialthinking` tool
- Uses refactored service-based architecture with dependency injection
- Global state management via `ServerState` and `ThoughtProcessor`

**Multi-Thinking Processing Flow:**
```
External LLM → sequentialthinking tool → ThoughtProcessor → WorkflowExecutor → MultiThinkingWorkflowRouter → MultiThinkingSequentialProcessor → Individual Thinking Agents → Synthesis
```

**Core Services (Dependency Injection):**
- **ThoughtProcessor**: Main orchestrator using specialized services
- **WorkflowExecutor**: Manages Multi-Thinking workflow execution
- **ContextBuilder**: Builds context-aware prompts
- **ResponseFormatter**: Formats final responses
- **SessionMemory**: Tracks thought history and branching

**AI-Powered Routing System:**
- **MultiThinkingIntelligentRouter**: AI-driven complexity analysis determines thinking sequence
- **AIComplexityAnalyzer**: Uses LLM to assess thought complexity, problem type, and required thinking modes
- **MultiThinkingSequentialProcessor**: Executes chosen sequence with model optimization
- **Thinking Complexity levels**: SINGLE, DOUBLE, TRIPLE, FULL sequences
- **Model Intelligence**: Enhanced model for Blue Hat synthesis, Standard model for individual hats

### Configuration & Data Flow

**Environment Variables:**
- `LLM_PROVIDER`: Provider selection (deepseek, groq, openrouter, ollama, github, anthropic)
- `{PROVIDER}_API_KEY`: API keys (e.g., `DEEPSEEK_API_KEY`, `GITHUB_TOKEN`)
- `{PROVIDER}_ENHANCED_MODEL_ID`: Enhanced model for complex synthesis (Blue Hat)
- `{PROVIDER}_STANDARD_MODEL_ID`: Standard model for individual hat processing
- `EXA_API_KEY`: Research capabilities (if using research agents)

**AI-Driven Model Strategy:**
- **Enhanced Models**: Used for Blue Hat (metacognitive) thinking - complex synthesis, integration
- **Standard Models**: Used for individual hat processing (White, Red, Black, Yellow, Green)
- **Intelligent Selection**: System automatically chooses appropriate model based on hat type and AI-assessed complexity
- **AI Analysis**: Replaces rule-based pattern matching with semantic understanding

**Processing Strategies (AI-Determined):**
1. **Single Hat**: Simple focused thinking (White Hat facts, Red Hat emotions, etc.)
2. **Double Hat**: Two-step sequences (e.g., Optimistic→Critical for idea evaluation)
3. **Triple Hat**: Core philosophical thinking (Factual→Creative→Synthesis)
4. **Full Sequence**: Complete Multi-Thinking methodology with Blue Hat orchestration

### Streamlined Module Architecture

**Core Framework:**
- `core/session.py`: SessionMemory for thought history (simplified, no Team dependency)
- `core/models.py`: ThoughtData validation and core data structures
- `core/types.py`: Type definitions and protocols
- `config/modernized_config.py`: Provider strategies with Enhanced/Standard model configuration
- `config/constants.py`: All system constants and configuration values

**Multi-Thinking Implementation:**
- `processors/multi_thinking_processor.py`: Main Multi-Thinking sequential processor
- `processors/multi_thinking_core.py`: Hat definitions, agent factory, core logic
- `routing/multi_thinking_router.py`: AI-powered intelligent routing based on thought complexity
- `routing/ai_complexity_analyzer.py`: AI-driven complexity and problem type analysis
- `routing/agno_workflow_router.py`: Agno Workflow integration layer
- `routing/complexity_types.py`: Core complexity analysis types and enums

**Service Layer:**
- `services/thought_processor_refactored.py`: Main thought processor with dependency injection
- `services/workflow_executor.py`: Multi-Thinking workflow execution
- `services/context_builder.py`: Context-aware prompt building
- `services/response_formatter.py`: Response formatting and extraction
- `services/server_core.py`: Server lifecycle and state management

**Infrastructure:**
- `infrastructure/logging_config.py`: Structured logging with rotation
- `infrastructure/persistent_memory.py`: Memory persistence capabilities
- `utils/utils.py`: Logging utilities and helper functions

### Architecture Characteristics

- **Clean Architecture**: Dependency injection, separation of concerns, service-based design
- **AI-Driven Intelligence**: Pure AI-based complexity analysis replacing rule-based systems
- **Multi-Thinking Focus**: Streamlined implementation without legacy multi-agent complexity
- **Model Optimization**: Smart model selection (Enhanced for synthesis, Standard for processing)
- **Modern Python**: Dataclasses, type hints, async/await, pattern matching
- **Environment-based config**: No config files, all via environment variables
- **Structured logging**: Rotation to `~/.sequential_thinking/logs/`

## Enhanced/Standard Model Configuration

**Naming Convention:**
- `{PROVIDER}_ENHANCED_MODEL_ID`: For complex synthesis tasks (Blue Hat thinking)
- `{PROVIDER}_STANDARD_MODEL_ID`: For individual hat processing

**Examples:**
```bash
# GitHub Models
GITHUB_ENHANCED_MODEL_ID="openai/gpt-5"      # Blue Hat synthesis
GITHUB_STANDARD_MODEL_ID="openai/gpt-5-min"  # Individual hats

# DeepSeek
DEEPSEEK_ENHANCED_MODEL_ID="deepseek-chat"   # Both synthesis and processing
DEEPSEEK_STANDARD_MODEL_ID="deepseek-chat"

# Anthropic
ANTHROPIC_ENHANCED_MODEL_ID="claude-3-5-sonnet-20241022"  # Synthesis
ANTHROPIC_STANDARD_MODEL_ID="claude-3-5-haiku-20241022"   # Processing
```

**Usage Strategy:**
- **Enhanced Model**: Blue Hat (metacognitive orchestrator) uses enhanced model for final synthesis
- **Standard Model**: Individual hats (White, Red, Black, Yellow, Green) use standard model
- **AI-Driven Selection**: System intelligently chooses model based on hat type and AI-assessed complexity

## Agno v2.0 Integration

**Framework Features:**
- **Workflow Integration**: Uses Agno Workflow system for Multi-Thinking processing
- **Agent Factory**: Creates specialized hat agents with ReasoningTools
- **Performance**: ~10,000x faster agent creation, ~50x less memory vs LangGraph
- **Version**: Requires `agno>=2.0.5`

**Key Integration Points:**
- `MultiThinkingWorkflowRouter`: Bridges MCP and Agno Workflow systems
- `MultiThinkingAgentFactory`: Creates individual hat agents using Agno v2.0
- **StepOutput**: Workflow results converted to Agno StepOutput format

**For Agno Documentation**: Use deepwiki MCP reference with repoName: `agno-agi/agno`

## AI-Powered Complexity Analysis

**Key Innovation**: The system uses AI instead of rule-based pattern matching for complexity analysis:

- **AIComplexityAnalyzer**: Uses LLM to assess thought complexity, semantic depth, and problem characteristics
- **Problem Type Detection**: AI identifies primary problem type (FACTUAL, EMOTIONAL, CREATIVE, PHILOSOPHICAL, etc.)
- **Thinking Modes Recommendation**: AI suggests required thinking modes for optimal processing
- **Semantic Understanding**: Replaces keyword matching with contextual analysis across languages

**Benefits over Rule-Based Systems:**
- Better handling of nuanced, philosophical, or cross-cultural content
- Adaptive to new problem types without code changes
- Semantic understanding vs simple pattern matching
- Reduced maintenance overhead (no keyword lists to maintain)

## Development Notes

**No Test Suite**: The project currently has no test files - all tests were removed during recent cleanup.

**Recent Architecture Changes**:
- Removed legacy multi-agent systems (agents/, optimization/, analysis/ modules)
- Consolidated configuration (removed processing_constants.py redundancy)
- Streamlined to 8 core modules focused on AI-driven Multi-Thinking

**Code Quality**: Uses ruff for linting/formatting, mypy for type checking. Run `uv run ruff check . --fix && uv run ruff format . && uv run mypy .` before committing.
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/utils/__init__.py:
--------------------------------------------------------------------------------

```python
"""Utilities module for MCP Sequential Thinking Server.

This module contains utility functions and helper classes.
"""

from .utils import setup_logging

__all__ = [
    # From utils
    "setup_logging",
]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/utils/utils.py:
--------------------------------------------------------------------------------

```python
"""Application utilities - completely rewritten for new architecture."""

from mcp_server_mas_sequential_thinking.infrastructure.logging_config import (
    get_logger,
    setup_logging,
)

# Re-export for convenience - no backward compatibility
__all__ = ["get_logger", "setup_logging"]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/infrastructure/__init__.py:
--------------------------------------------------------------------------------

```python
"""Infrastructure module for MCP Sequential Thinking Server.

This module contains infrastructure concerns including logging configuration
and persistent memory.
"""

from .logging_config import LogTimer, MetricsLogger, get_logger, setup_logging
from .persistent_memory import (
    PersistentMemoryManager,
    create_persistent_memory,
)

__all__ = [
    "LogTimer",
    "MetricsLogger",
    # From persistent_memory
    "PersistentMemoryManager",
    "create_persistent_memory",
    # From logging_config
    "get_logger",
    "setup_logging",
]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/core/__init__.py:
--------------------------------------------------------------------------------

```python
"""Core domain module for MCP Sequential Thinking Server.

This module contains the core domain logic including data models,
types, and session management functionality.
"""

from .models import ThoughtData
from .session import SessionMemory
from .types import (
    ConfigurationError,
    CoordinationPlan,
    ExecutionMode,
    ProcessingMetadata,
    TeamCreationError,
    ThoughtProcessingError,
)

__all__ = [
    "ConfigurationError",
    "CoordinationPlan",
    "ExecutionMode",
    "ProcessingMetadata",
    # From session
    "SessionMemory",
    "TeamCreationError",
    # From models
    "ThoughtData",
    # From types
    "ThoughtProcessingError",
]

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
FROM python:3.10-alpine

# Set environment variables to prevent Python from writing .pyc files
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

# Install build dependencies
RUN apk add --no-cache gcc musl-dev libffi-dev openssl-dev

# Set working directory
WORKDIR /app

# Copy project files
COPY pyproject.toml ./
COPY main.py ./
COPY README.md ./

# Upgrade pip and install hatchling build tool
RUN pip install --upgrade pip && \
    pip install hatchling

# Build and install the project
RUN pip install .

# Command to run the MCP server
CMD ["mcp-server-mas-sequential-thinking"]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/processors/__init__.py:
--------------------------------------------------------------------------------

```python
"""Processors module for MCP Sequential Thinking Server.

This module contains processing logic including multi-thinking core functionality
and multi-thinking processing implementation.
"""

from .multi_thinking_core import (
    MultiThinkingAgentFactory,
    ThinkingDirection,
    create_thinking_agent,
    get_all_thinking_directions,
    get_thinking_timing,
)
from .multi_thinking_processor import (
    MultiThinkingProcessingResult,
    MultiThinkingSequentialProcessor,
    create_multi_thinking_step_output,
)

__all__ = [
    "MultiThinkingAgentFactory",
    # From multi_thinking_processor
    "MultiThinkingProcessingResult",
    "MultiThinkingSequentialProcessor",
    # From multi_thinking_core
    "ThinkingDirection",
    "create_multi_thinking_step_output",
    "create_thinking_agent",
    "get_all_thinking_directions",
    "get_thinking_timing",
]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/config/__init__.py:
--------------------------------------------------------------------------------

```python
"""Configuration module for MCP Sequential Thinking Server.

This module contains all configuration-related functionality including
constants, processing constants, and modernized configuration management.
"""

from .constants import (
    ComplexityThresholds,
    DefaultTimeouts,
    DefaultValues,
    FieldLengthLimits,
    LoggingLimits,
    PerformanceMetrics,
    ProcessingDefaults,
    QualityThresholds,
    SecurityConstants,
    ValidationLimits,
)
from .modernized_config import check_required_api_keys, get_model_config

__all__ = [
    # From constants
    "ComplexityThresholds",
    "DefaultTimeouts",
    "DefaultValues",
    "FieldLengthLimits",
    "LoggingLimits",
    "PerformanceMetrics",
    "ProcessingDefaults",
    "QualityThresholds",
    "SecurityConstants",
    "ValidationLimits",
    # From modernized_config
    "check_required_api_keys",
    "get_model_config",
]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/routing/__init__.py:
--------------------------------------------------------------------------------

```python
"""Routing module for MCP Sequential Thinking Server.

This module contains routing and workflow logic including adaptive routing,
workflow routing, optimization, and multi-thinking routing functionality.
"""

from .agno_workflow_router import (
    MultiThinkingWorkflowResult,
    MultiThinkingWorkflowRouter,
)
from .ai_complexity_analyzer import AIComplexityAnalyzer
from .complexity_types import ComplexityLevel, ProcessingStrategy
from .multi_thinking_router import (
    MultiThinkingIntelligentRouter,
    create_multi_thinking_router,
)

__all__ = [
    # From ai_complexity_analyzer
    "AIComplexityAnalyzer",
    # From complexity_types
    "ComplexityLevel",
    # From multi_thinking_router
    "MultiThinkingIntelligentRouter",
    # From agno_workflow_router
    "MultiThinkingWorkflowResult",
    "MultiThinkingWorkflowRouter",
    "ProcessingStrategy",
    "create_multi_thinking_router",
]

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/__init__.py:
--------------------------------------------------------------------------------

```python
"""Services module for MCP Sequential Thinking Server.

This module contains business logic services including server core,
response processing, retry handling, and specialized processing services.
"""

from .context_builder import ContextBuilder
from .processing_orchestrator import ProcessingOrchestrator
from .response_formatter import ResponseExtractor, ResponseFormatter
from .response_processor import ResponseProcessor
from .retry_handler import TeamProcessingRetryHandler
from .server_core import (
    ServerConfig,
    ServerState,
    ThoughtProcessor,
    create_server_lifespan,
    create_validated_thought_data,
)
from .workflow_executor import WorkflowExecutor

__all__ = [
    # From context_builder
    "ContextBuilder",
    # From processing_orchestrator
    "ProcessingOrchestrator",
    # From response_formatter
    "ResponseExtractor",
    "ResponseFormatter",
    # From response_processor
    "ResponseProcessor",
    # From server_core
    "ServerConfig",
    "ServerState",
    # From retry_handler
    "TeamProcessingRetryHandler",
    "ThoughtProcessor",
    # From workflow_executor
    "WorkflowExecutor",
    "create_server_lifespan",
    "create_validated_thought_data",
]

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - llmProvider
      - deepseekApiKey
    properties:
      llmProvider:
        type: string
        default: deepseek
        description: "LLM Provider. Options: deepseek, groq, openrouter."
      deepseekApiKey:
        type: string
        description: API key for DeepSeek. Required if llmProvider is 'deepseek'.
      groqApiKey:
        type: string
        default: ""
        description: API key for Groq. Required if llmProvider is 'groq'.
      openrouterApiKey:
        type: string
        default: ""
        description: API key for OpenRouter. Required if llmProvider is 'openrouter'.
      exaApiKey:
        type: string
        default: ""
        description: API key for Exa to enable web research capabilities for thinking agents.
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({
      command: 'mcp-server-mas-sequential-thinking',
      args: [],
      env: {
        LLM_PROVIDER: config.llmProvider,
        DEEPSEEK_API_KEY: config.deepseekApiKey,
        GROQ_API_KEY: config.groqApiKey,
        OPENROUTER_API_KEY: config.openrouterApiKey,
        EXA_API_KEY: config.exaApiKey
      }
    })
  exampleConfig:
    llmProvider: deepseek
    deepseekApiKey: your_deepseek_api_key
    groqApiKey: ""
    openrouterApiKey: ""
    exaApiKey: your_exa_api_key

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/__init__.py:
--------------------------------------------------------------------------------

```python
"""MCP Sequential Thinking Server with AI-Powered Routing.

A sophisticated Multi-Agent System (MAS) for sequential thinking with intelligent
AI-based routing using advanced multi-thinking methodology.

AI ROUTING FEATURES:
- AI-powered complexity analysis replacing rule-based scoring
- Intelligent thinking sequence selection (single, double, triple, full)
- Semantic understanding of philosophical and technical depth
- Automatic synthesis integration for complex thought processing

CORE ARCHITECTURE:
- AI-first routing with semantic complexity understanding
- Multi-thinking methodology with intelligent orchestration
- Unified agent factory eliminating code duplication
- Separated server concerns for better maintainability
- Optimized performance with async processing

Key Features:
- **AI Routing**: Semantic complexity analysis and strategy selection
- **Multi-Thinking Integration**: Factual, emotional, critical, optimistic, creative,
  and synthesis processing
- **Multi-Provider Support**: DeepSeek, Groq, OpenRouter, GitHub, Ollama
- **Philosophical Understanding**: Deep analysis for existential questions

Usage:
    # Server usage
    uv run mcp-server-mas-sequential-thinking

    # Direct processing
    from mcp_server_mas_sequential_thinking.services.server_core import ThoughtProcessor
    from mcp_server_mas_sequential_thinking.core.session import SessionMemory

    processor = ThoughtProcessor(session_memory)
    result = await processor.process_thought(thought_data)

AI Routing Strategies:
    - Single Direction: Simple factual questions → quick focused processing
    - Double/Triple Direction: Moderate complexity → focused thinking sequences
    - Full Multi-Thinking: Complex philosophical questions → comprehensive analysis

Configuration:
    Environment variables:
    - LLM_PROVIDER: Primary provider (deepseek, groq, openrouter, github, ollama)
    - {PROVIDER}_API_KEY: API keys for providers
    - {PROVIDER}_{TEAM|AGENT}_MODEL_ID: Model selection
    - AI_CONFIDENCE_THRESHOLD: Routing confidence threshold (default: 0.7)

Performance Benefits:
    - Intelligent complexity assessment using AI understanding
    - Automated synthesis processing for coherent responses
    - Semantic depth recognition for philosophical questions
    - Efficient thinking sequence selection based on content analysis
"""

__version__ = "0.6.0"


def get_version() -> str:
    """Get package version."""
    return __version__

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/routing/complexity_types.py:
--------------------------------------------------------------------------------

```python
"""Core complexity analysis types and enums.

This file contains the essential types needed for complexity analysis,
extracted from the old adaptive_routing.py to support AI-first architecture.
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from mcp_server_mas_sequential_thinking.core.models import ThoughtData


class ProcessingStrategy(Enum):
    """Processing strategy types."""

    SINGLE_AGENT = "single_agent"
    MULTI_AGENT = "multi_agent"
    HYBRID = "hybrid"


class ComplexityLevel(Enum):
    """Complexity levels for thought processing."""

    SIMPLE = "simple"
    MODERATE = "moderate"
    COMPLEX = "complex"
    HIGHLY_COMPLEX = "highly_complex"


@dataclass
class ComplexityMetrics:
    """Metrics for thought complexity analysis."""

    # Core metrics (used by both AI and fallback analyzers)
    complexity_score: float  # Primary score (0-100)

    # Detailed breakdown (optional, for debugging/analysis)
    word_count: int = 0
    sentence_count: int = 0
    question_count: int = 0
    technical_terms: int = 0
    branching_references: int = 0
    research_indicators: int = 0
    analysis_depth: int = 0
    philosophical_depth_boost: int = 0

    # AI Analysis Results (critical for routing decisions)
    primary_problem_type: str = "GENERAL"  # AI-determined problem type
    thinking_modes_needed: list[str] | None = None  # AI-recommended thinking modes

    # Analysis metadata
    analyzer_type: str = "unknown"  # "ai" or "basic"
    reasoning: str = ""  # Why this score was assigned

    def __post_init__(self):
        if self.thinking_modes_needed is None:
            self.thinking_modes_needed = ["SYNTHESIS"]


@dataclass
class RoutingDecision:
    """Decision result from routing analysis."""

    strategy: ProcessingStrategy
    complexity_level: ComplexityLevel
    complexity_score: float
    reasoning: str
    estimated_token_usage: tuple[int, int]  # (min, max)
    estimated_cost: float
    specialist_recommendations: list[str] | None = None

    def __post_init__(self):
        if self.specialist_recommendations is None:
            self.specialist_recommendations = []


class ComplexityAnalyzer(ABC):
    """Abstract base class for complexity analysis."""

    @abstractmethod
    async def analyze(self, thought_data: "ThoughtData") -> ComplexityMetrics:
        """Analyze thought complexity and return metrics."""

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/core/session.py:
--------------------------------------------------------------------------------

```python
"""Session management for thought history and branching."""

import asyncio
from dataclasses import dataclass, field

from mcp_server_mas_sequential_thinking.config.constants import ValidationLimits

from .models import ThoughtData
from .types import BranchId, ThoughtNumber


@dataclass
class SessionMemory:
    """Manages thought history and branches with optimized lookups and DoS protection.

    Thread-safe implementation with async locks to prevent race conditions.
    """

    thought_history: list[ThoughtData] = field(default_factory=list)
    branches: dict[BranchId, list[ThoughtData]] = field(default_factory=dict)
    # High-performance cache for O(1) thought lookups by number
    _thought_cache: dict[ThoughtNumber, ThoughtData] = field(
        default_factory=dict, init=False, repr=False
    )
    # Thread safety lock for concurrent access protection
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False, repr=False)

    # DoS protection constants as class attributes
    MAX_THOUGHTS_PER_SESSION: int = ValidationLimits.MAX_THOUGHTS_PER_SESSION
    MAX_BRANCHES_PER_SESSION: int = ValidationLimits.MAX_BRANCHES_PER_SESSION
    MAX_THOUGHTS_PER_BRANCH: int = ValidationLimits.MAX_THOUGHTS_PER_BRANCH

    async def add_thought(self, thought: ThoughtData) -> None:
        """Add thought with efficient DoS protection and optimized branch management.

        Thread-safe implementation with async lock to prevent race conditions.
        """
        async with self._lock:
            # Early DoS protection checks with descriptive errors
            self._validate_session_limits(thought)

            # Update data structures atomically under lock
            self.thought_history.append(thought)
            self._thought_cache[thought.thoughtNumber] = thought

            # Handle branching with optimized setdefault pattern
            if thought.branchFromThought is not None and thought.branchId is not None:
                self.branches.setdefault(thought.branchId, []).append(thought)

    def _validate_session_limits(self, thought: ThoughtData) -> None:
        """Validate session limits with early exit optimization."""
        # Primary session limit check
        if len(self.thought_history) >= self.MAX_THOUGHTS_PER_SESSION:
            raise ValueError(
                f"Session exceeds maximum {self.MAX_THOUGHTS_PER_SESSION} thoughts"
            )

        # Branch-specific validations only if needed
        if not thought.branchId:
            return

        # Check total branch limit
        if len(self.branches) >= self.MAX_BRANCHES_PER_SESSION:
            raise ValueError(
                f"Session exceeds maximum {self.MAX_BRANCHES_PER_SESSION} branches"
            )

        # Check individual branch limit
        if (
            thought.branchId in self.branches
            and len(self.branches[thought.branchId]) >= self.MAX_THOUGHTS_PER_BRANCH
        ):
            raise ValueError(
                f"Branch '{thought.branchId}' exceeds maximum "
                f"{self.MAX_THOUGHTS_PER_BRANCH} thoughts"
            )

    async def find_thought_content(self, thought_number: ThoughtNumber) -> str:
        """Find the content of a specific thought by number using optimized cache lookup."""
        async with self._lock:
            # Use cache for O(1) lookup instead of O(n) search
            thought = self._thought_cache.get(thought_number)
            return thought.thought if thought else "Unknown thought"

    async def get_branch_summary(self) -> dict[BranchId, int]:
        """Get summary of all branches."""
        async with self._lock:
            return {
                branch_id: len(thoughts)
                for branch_id, thoughts in self.branches.items()
            }

    def get_current_branch_id(self, thought: ThoughtData) -> str:
        """Get the current branch ID for a thought with improved logic."""
        return thought.branchId or "main"

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-mas-sequential-thinking"
version = "0.7.0"
description = "MCP Agent Implementation for Sequential Thinking"
readme = "README.md"
requires-python = ">=3.10"
authors = [
    { name = "Frad LEE", email = "[email protected]" },
    { name = "Alain Ivars", email = "[email protected]" },
]
dependencies = [
    "agno>=2.0.5",
    "asyncio",
    "exa-py",
    "python-dotenv",
    "mcp",
    "groq",
    "ollama",
    "openrouter",
    "httpx[socks]>=0.28.1",
    "sqlalchemy",
]

[project.optional-dependencies]
dev = [
    "pytest",
    "black",
    "isort",
    "mypy",
    "ruff",
]

[project.scripts]
mcp-server-mas-sequential-thinking = "mcp_server_mas_sequential_thinking.main:run"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/mcp_server_mas_sequential_thinking"]

[dependency-groups]
dev = [
    "pytest>=8.4.1",
]

[tool.ruff]
target-version = "py310"
line-length = 88
src = ["src", "tests"]

[tool.ruff.lint]
select = [
    "E",      # pycodestyle errors
    "W",      # pycodestyle warnings
    "F",      # pyflakes
    "I",      # isort
    "N",      # pep8-naming
    "D",      # pydocstyle
    "UP",     # pyupgrade
    "YTT",    # flake8-2020
    "ANN",    # flake8-annotations
    "ASYNC",  # flake8-async
    "S",      # flake8-bandit
    "BLE",    # flake8-blind-except
    "FBT",    # flake8-boolean-trap
    "B",      # flake8-bugbear
    "A",      # flake8-builtins
    "COM",    # flake8-commas
    "C4",     # flake8-comprehensions
    "DTZ",    # flake8-datetimez
    "T10",    # flake8-debugger
    "DJ",     # flake8-django
    "EM",     # flake8-errmsg
    "EXE",    # flake8-executable
    "FA",     # flake8-future-annotations
    "ISC",    # flake8-implicit-str-concat
    "ICN",    # flake8-import-conventions
    "G",      # flake8-logging-format
    "INP",    # flake8-no-pep420
    "PIE",    # flake8-pie
    "T20",    # flake8-print
    "PYI",    # flake8-pyi
    "PT",     # flake8-pytest-style
    "Q",      # flake8-quotes
    "RSE",    # flake8-raise
    "RET",    # flake8-return
    "SLF",    # flake8-self
    "SLOT",   # flake8-slots
    "SIM",    # flake8-simplify
    "TID",    # flake8-tidy-imports
    "TCH",    # flake8-type-checking
    "INT",    # flake8-gettext
    "ARG",    # flake8-unused-arguments
    "PTH",    # flake8-use-pathlib
    "ERA",    # eradicate
    "PD",     # pandas-vet
    "PGH",    # pygrep-hooks
    "PL",     # pylint
    "TRY",    # tryceratops
    "FLY",    # flynt
    "NPY",    # numpy
    "AIR",    # airflow
    "PERF",   # perflint
    "FURB",   # refurb
    "LOG",    # flake8-logging
    "RUF",    # ruff-specific rules
]
ignore = [
    "D100",    # Missing docstring in public module
    "D101",    # Missing docstring in public class
    "D102",    # Missing docstring in public method
    "D103",    # Missing docstring in public function
    "D104",    # Missing docstring in public package
    "D105",    # Missing docstring in magic method
    "D107",    # Missing docstring in __init__
    "COM812",  # Trailing comma missing
    "ISC001",  # Implicitly concatenated string literals on one line
    "FBT001",  # Boolean positional arg in function definition
    "FBT002",  # Boolean default positional argument in function definition
    "S101",    # Use of assert detected
    "PLR0913", # Too many arguments to function call
    "PLR2004", # Magic value used in comparison
    "TRY003",  # Avoid specifying long messages outside the exception class
    "EM101",   # Exception must not use a string literal, assign to variable first
    "EM102",   # Exception must not use an f-string literal, assign to variable first
]

[tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101", "PLR2004", "ANN", "D"]
"__init__.py" = ["F401"]

[tool.ruff.lint.pydocstyle]
convention = "google"

[tool.ruff.lint.isort]
known-first-party = ["mcp_server_mas_sequential_thinking"]
split-on-trailing-comma = true

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/core/models.py:
--------------------------------------------------------------------------------

```python
"""Streamlined models with consolidated validation logic."""

from enum import Enum
from typing import Any

from pydantic import BaseModel, Field, model_validator

from mcp_server_mas_sequential_thinking.config.constants import (
    FieldLengthLimits,
    ValidationLimits,
)

from .types import BranchId, ThoughtNumber


class ThoughtType(Enum):
    """Types of thoughts in the sequential thinking process."""

    STANDARD = "standard"
    REVISION = "revision"
    BRANCH = "branch"


def _validate_thought_relationships(data: dict) -> None:
    """Validate thought relationships with optimized validation logic."""
    # Extract values once with modern dict methods
    data.get("isRevision", False)
    branch_from_thought = data.get("branchFromThought")
    branch_id = data.get("branchId")
    current_number = data.get("thoughtNumber")

    # Collect validation errors efficiently
    errors = []

    # Relationship validation with guard clauses
    if branch_id is not None and branch_from_thought is None:
        errors.append("branchId requires branchFromThought to be set")

    # Numeric validation with early exit
    if current_number is None:
        if errors:
            raise ValueError("; ".join(errors))
        return

    # Validate numeric relationships
    if branch_from_thought is not None and branch_from_thought >= current_number:
        errors.append("branchFromThought must be less than current thoughtNumber")

    if errors:
        raise ValueError("; ".join(errors))


class ThoughtData(BaseModel):
    """Streamlined thought data model with consolidated validation."""

    model_config = {"validate_assignment": True, "frozen": True}

    # Core fields
    thought: str = Field(
        ...,
        min_length=FieldLengthLimits.MIN_STRING_LENGTH,
        description="Content of the thought",
    )
    # MCP API compatibility - camelCase field names required
    thoughtNumber: ThoughtNumber = Field(  # noqa: N815
        ...,
        ge=ValidationLimits.MIN_THOUGHT_NUMBER,
        description="Sequence number starting from 1",
    )
    totalThoughts: int = Field(  # noqa: N815
        ...,
        ge=1,
        description="Estimated total thoughts",
    )
    nextThoughtNeeded: bool = Field(  # noqa: N815
        ..., description="Whether another thought is needed"
    )

    # Required workflow fields
    isRevision: bool = Field(  # noqa: N815
        ..., description="Whether this revises a previous thought"
    )
    branchFromThought: ThoughtNumber | None = Field(  # noqa: N815
        ...,
        ge=ValidationLimits.MIN_THOUGHT_NUMBER,
        description="Thought number to branch from",
    )
    branchId: BranchId | None = Field(  # noqa: N815
        ..., description="Unique branch identifier"
    )
    needsMoreThoughts: bool = Field(  # noqa: N815
        ..., description="Whether more thoughts are needed beyond estimate"
    )

    @property
    def thought_type(self) -> ThoughtType:
        """Determine the type of thought based on field values."""
        if self.isRevision:
            return ThoughtType.REVISION
        if self.branchFromThought is not None:
            return ThoughtType.BRANCH
        return ThoughtType.STANDARD

    @model_validator(mode="before")
    @classmethod
    def validate_thought_data(cls, data: dict[str, Any]) -> dict[str, Any]:
        """Consolidated validation with simplified logic."""
        if isinstance(data, dict):
            _validate_thought_relationships(data)
        return data

    def format_for_log(self) -> str:
        """Format thought for logging with optimized type-specific formatting."""
        # Use match statement for modern Python pattern matching
        match self.thought_type:
            case ThoughtType.REVISION:
                prefix = (
                    f"Revision {self.thoughtNumber}/{self.totalThoughts} "
                    f"(revising #{self.branchFromThought})"
                )
            case ThoughtType.BRANCH:
                prefix = (
                    f"Branch {self.thoughtNumber}/{self.totalThoughts} "
                    f"(from #{self.branchFromThought}, ID: {self.branchId})"
                )
            case _:  # ThoughtType.STANDARD
                prefix = f"Thought {self.thoughtNumber}/{self.totalThoughts}"

        # Use multiline string formatting for better readability
        return (
            f"{prefix}\n"
            f"  Content: {self.thought}\n"
            f"  Next: {self.nextThoughtNeeded}, More: {self.needsMoreThoughts}"
        )

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/context_builder.py:
--------------------------------------------------------------------------------

```python
"""Context building service for thought processing.

This service handles building context-aware prompts from thought data,
managing session history, and constructing appropriate inputs for processing.
"""

from mcp_server_mas_sequential_thinking.core import SessionMemory, ThoughtData
from mcp_server_mas_sequential_thinking.utils import setup_logging

logger = setup_logging()


class ContextBuilder:
    """Service responsible for building context-aware prompts and managing thought context."""

    def __init__(self, session: SessionMemory) -> None:
        """Initialize the context builder with session memory.

        Args:
            session: The session memory instance for accessing thought history
        """
        self._session = session

    async def build_context_prompt(self, thought_data: ThoughtData) -> str:
        """Build context-aware input prompt with optimized string construction.

        This method creates contextual prompts based on thought type:
        - Revision thoughts include original content
        - Branch thoughts include origin content
        - Sequential thoughts use basic format

        Args:
            thought_data: The thought data to build context for

        Returns:
            Formatted prompt string with appropriate context
        """
        # Pre-calculate base components for efficiency
        base = f"Process Thought #{thought_data.thoughtNumber}:\n"
        content = f'\nThought Content: "{thought_data.thought}"'

        # Add context using pattern matching with optimized string building
        match thought_data:
            case ThoughtData(isRevision=True, branchFromThought=revision_num) if (
                revision_num
            ):
                original = await self._find_thought_content_safe(revision_num)
                context = f'**REVISION of Thought #{revision_num}** (Original: "{original}")\n'
                return f"{base}{context}{content}"

            case ThoughtData(branchFromThought=branch_from, branchId=branch_id) if (
                branch_from and branch_id
            ):
                origin = await self._find_thought_content_safe(branch_from)
                context = f'**BRANCH (ID: {branch_id}) from Thought #{branch_from}** (Origin: "{origin}")\n'
                return f"{base}{context}{content}"

            case _:
                return f"{base}{content}"

    async def _find_thought_content_safe(self, thought_number: int) -> str:
        """Safely find thought content with error handling.

        Args:
            thought_number: The thought number to find

        Returns:
            The thought content or a placeholder if not found
        """
        try:
            return await self._session.find_thought_content(thought_number)
        except Exception:
            return "[not found]"

    async def log_context_building(
        self, thought_data: ThoughtData, input_prompt: str
    ) -> None:
        """Log context building details for debugging and monitoring.

        Args:
            thought_data: The thought data being processed
            input_prompt: The built prompt
        """
        logger.info("📝 CONTEXT BUILDING:")

        if thought_data.isRevision and thought_data.branchFromThought:
            logger.info(
                "  Type: Revision of thought #%s", thought_data.branchFromThought
            )
            original = await self._find_thought_content_safe(
                thought_data.branchFromThought
            )
            logger.info("  Original thought: %s", original)
        elif thought_data.branchFromThought and thought_data.branchId:
            logger.info(
                "  Type: Branch '%s' from thought #%s",
                thought_data.branchId,
                thought_data.branchFromThought,
            )
            origin = await self._find_thought_content_safe(
                thought_data.branchFromThought
            )
            logger.info("  Branch origin: %s", origin)
        else:
            logger.info("  Type: Sequential thought #%s", thought_data.thoughtNumber)

        logger.info("  Session thoughts: %d total", len(self._session.thought_history))
        logger.info("  Input thought: %s", thought_data.thought)
        logger.info("  Built prompt length: %d chars", len(input_prompt))
        logger.info("  Built prompt:\n%s", input_prompt)

        # Use field length limits constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import FieldLengthLimits

            separator_length = FieldLengthLimits.SEPARATOR_LENGTH
        except ImportError:
            pass

        logger.info("  %s", "=" * separator_length)

    def create_simplified_prompt(self, input_prompt: str) -> str:
        """Create a simplified prompt for single-agent processing.

        Args:
            input_prompt: The original input prompt

        Returns:
            Simplified prompt optimized for single-agent processing
        """
        return f"""Process this thought efficiently:

{input_prompt}

Provide a focused response with clear guidance for the next step."""

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/retry_handler.py:
--------------------------------------------------------------------------------

```python
"""Retry handling utilities for robust processing."""

import asyncio
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, TypeVar

from mcp_server_mas_sequential_thinking.config.constants import (
    DefaultTimeouts,
    PerformanceMetrics,
)
from mcp_server_mas_sequential_thinking.core.types import ThoughtProcessingError
from mcp_server_mas_sequential_thinking.infrastructure.logging_config import get_logger

logger = get_logger(__name__)

T = TypeVar("T")


@dataclass
class RetryConfig:
    """Configuration for retry operations."""

    max_attempts: int = DefaultTimeouts.MAX_RETRY_ATTEMPTS
    sleep_duration: float = PerformanceMetrics.RETRY_SLEEP_DURATION
    exponential_base: float = DefaultTimeouts.RETRY_EXPONENTIAL_BASE
    use_exponential_backoff: bool = False


class RetryHandler:
    """Handles retry logic with configurable strategies."""

    def __init__(self, config: RetryConfig | None = None) -> None:
        """Initialize retry handler with configuration."""
        self.config = config or RetryConfig()

    async def execute_with_retry(
        self,
        operation: Callable[[], Any],
        operation_name: str,
        context_info: dict | None = None,
    ) -> Any:
        """Execute operation with retry logic."""
        last_exception = None
        max_retries = self.config.max_attempts

        for retry_count in range(max_retries + 1):
            try:
                self._log_attempt(
                    retry_count, max_retries, operation_name, context_info
                )

                start_time = time.time()
                result = await operation()
                processing_time = time.time() - start_time

                self._log_success(operation_name, processing_time)
                return result

            except Exception as e:
                last_exception = e
                self._log_error(retry_count, max_retries, operation_name, e)

                if retry_count < max_retries:
                    await self._wait_before_retry(retry_count)
                else:
                    self._log_exhaustion(max_retries, operation_name)
                    raise ThoughtProcessingError(
                        f"{operation_name} failed after {max_retries + 1} attempts: {e}"
                    ) from e

        # Should never reach here, but provide safety
        raise ThoughtProcessingError(
            f"Unexpected error in retry logic for {operation_name}"
        ) from last_exception

    def _log_attempt(
        self,
        retry_count: int,
        max_retries: int,
        operation_name: str,
        context_info: dict | None,
    ) -> None:
        """Log retry attempt information."""
        logger.info(
            f"Processing attempt {retry_count + 1}/{max_retries + 1}: {operation_name}"
        )

        if context_info:
            for key, value in context_info.items():
                logger.info(f"  {key}: {value}")

    def _log_success(self, operation_name: str, processing_time: float) -> None:
        """Log successful operation completion."""
        logger.info(f"✅ {operation_name} completed in {processing_time:.3f}s")

    def _log_error(
        self, retry_count: int, max_retries: int, operation_name: str, error: Exception
    ) -> None:
        """Log error information."""
        logger.error(f"{operation_name} error on attempt {retry_count + 1}: {error}")

        if retry_count < max_retries:
            logger.info(f"Retrying... ({retry_count + 1}/{max_retries})")

    def _log_exhaustion(self, max_retries: int, operation_name: str) -> None:
        """Log retry exhaustion."""
        logger.error(f"All retry attempts exhausted for {operation_name}")

    async def _wait_before_retry(self, retry_count: int) -> None:
        """Wait before retry with optional exponential backoff."""
        if self.config.use_exponential_backoff:
            wait_time = self.config.sleep_duration * (
                self.config.exponential_base**retry_count
            )
        else:
            wait_time = self.config.sleep_duration

        await asyncio.sleep(wait_time)


class TeamProcessingRetryHandler(RetryHandler):
    """Specialized retry handler for team processing operations."""

    def __init__(self) -> None:
        """Initialize with team processing specific configuration."""
        config = RetryConfig(
            max_attempts=DefaultTimeouts.MAX_RETRY_ATTEMPTS,
            sleep_duration=PerformanceMetrics.RETRY_SLEEP_DURATION,
            use_exponential_backoff=True,
        )
        super().__init__(config)

    async def execute_team_processing(
        self, team_operation: Callable[[], Any], team_info: dict, complexity_level: str
    ) -> Any:
        """Execute team processing with specialized retry logic."""
        context_info = {
            "complexity": complexity_level,
            "team": team_info.get("name", "unknown"),
            "agents": team_info.get("member_count", 0),
            "leader": team_info.get("leader_model", "unknown"),
        }

        return await self.execute_with_retry(
            team_operation, "MULTI-AGENT TEAM PROCESSING", context_info
        )

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/response_processor.py:
--------------------------------------------------------------------------------

```python
"""Response processing utilities for consistent response handling."""

from dataclasses import dataclass
from typing import Any

from mcp_server_mas_sequential_thinking.infrastructure.logging_config import get_logger

logger = get_logger(__name__)


@dataclass
class ProcessedResponse:
    """Standardized response data structure."""

    content: str
    raw_response: Any
    processing_time: float | None = None
    metadata: dict | None = None


class ResponseExtractor:
    """Handles extraction of content from various response types."""

    @staticmethod
    def extract_content(response: Any) -> str:
        """Extract string content from various response formats."""
        if response is None:
            return ""

        # Handle string responses directly
        if isinstance(response, str):
            return response

        # Handle RunOutput from Agno framework
        if hasattr(response, "content"):
            content = response.content
            if isinstance(content, str):
                return content
            if isinstance(content, dict):
                # Extract from dict-based content
                return ResponseExtractor._extract_from_dict(content)
            if hasattr(content, "__str__"):
                return str(content)

        # Handle dictionary responses
        if isinstance(response, dict):
            return ResponseExtractor._extract_from_dict(response)

        # Handle objects with text/message attributes
        for attr in ["text", "message", "result", "output"]:
            if hasattr(response, attr):
                value = getattr(response, attr)
                if isinstance(value, str):
                    return value

        # Fallback to string conversion
        logger.warning(f"Unknown response type {type(response)}, converting to string")
        return str(response)

    @staticmethod
    def _extract_from_dict(content_dict: dict) -> str:
        """Extract content from dictionary-based responses."""
        # Common content keys in order of preference
        content_keys = ["content", "text", "message", "result", "output", "response"]

        for key in content_keys:
            if key in content_dict:
                value = content_dict[key]
                if isinstance(value, str):
                    return value
                if isinstance(value, dict) and "result" in value:
                    # Handle nested result structures
                    return str(value["result"])

        # If no standard key found, try to get first string value
        for value in content_dict.values():
            if isinstance(value, str) and value.strip():
                return value

        # Fallback to string representation
        return str(content_dict)


class ResponseProcessor:
    """Comprehensive response processing with logging and validation."""

    def __init__(self) -> None:
        """Initialize response processor."""
        self.extractor = ResponseExtractor()

    def process_response(
        self,
        response: Any,
        processing_time: float | None = None,
        context: str = "processing",
    ) -> ProcessedResponse:
        """Process response with extraction, validation, and logging."""
        content = self.extractor.extract_content(response)

        # Validate content
        if not content or not content.strip():
            logger.warning(f"Empty response content in {context}")
            content = f"[Empty response from {context}]"

        # Create metadata
        metadata = {
            "context": context,
            "response_type": type(response).__name__,
            "content_length": len(content),
            "has_content": bool(content.strip()),
        }

        processed = ProcessedResponse(
            content=content,
            raw_response=response,
            processing_time=processing_time,
            metadata=metadata,
        )

        self._log_response_details(processed, context)
        return processed

    def _log_response_details(self, processed: ProcessedResponse, context: str) -> None:
        """Log detailed response information."""
        logger.info(f"📝 {context.upper()} RESPONSE:")
        if processed.metadata:
            logger.info(f"  Type: {processed.metadata.get('response_type', 'unknown')}")
            logger.info(
                f"  Length: {processed.metadata.get('content_length', 0)} chars"
            )
        else:
            logger.info("  Type: unknown")
            logger.info(f"  Length: {len(processed.content)} chars")

        if processed.processing_time:
            logger.info(f"  Processing time: {processed.processing_time:.3f}s")

        # Log content preview (first 100 chars)
        content_preview = processed.content[:100]
        if len(processed.content) > 100:
            content_preview += "..."

        logger.info(f"  Preview: {content_preview}")


class ResponseFormatter:
    """Formats responses for consistent output."""

    @staticmethod
    def format_for_client(processed: ProcessedResponse) -> str:
        """Format processed response for client consumption."""
        content = processed.content.strip()

        # Ensure content ends with appropriate punctuation
        if content and not content.endswith((".", "!", "?", ":", ";")):
            content += "."

        return content

    @staticmethod
    def format_with_metadata(processed: ProcessedResponse) -> dict:
        """Format response with metadata for debugging."""
        return {
            "content": processed.content,
            "metadata": processed.metadata,
            "processing_time": processed.processing_time,
        }

```

--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------

```markdown
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.7.0] - 2025-09-24

### Added
- Parallel execution for thinking agents to improve processing performance
- Comprehensive Mermaid diagrams in documentation showing parallel processing flows
- Detailed agent descriptions in README files with multi-dimensional thinking methodology
- Comparison table with original TypeScript version highlighting architectural differences

### Changed
- **PERFORMANCE**: Converted non-synthesis agents to run in parallel using asyncio.gather for significant speed improvements
- **GROQ PROVIDER**: Updated Groq provider to use OpenAI GPT-OSS models (openai/gpt-oss-120b for enhanced, openai/gpt-oss-20b for standard)
- Complete restructure of README files with cleaner formatting and better organization
- Improved documentation clarity by removing all emoji characters from codebase and documentation

### Fixed
- Resolved MetricsLogger import error that was preventing server startup
- Fixed missing MetricsLogger class implementation in logging configuration
- Corrected Mermaid diagram syntax errors in README files
- Removed references to non-existent PerformanceTracker class

## [0.5.0] - 2025-09-17

### Added
- Comprehensive TDD test coverage for refactoring and quality improvement
- Default settings and processing strategy enum for enhanced configuration
- Adaptive architecture with cost optimization capabilities
- Comprehensive test infrastructure and unit tests
- Magic number extraction to constants for better maintainability

### Changed
- **BREAKING**: Migration to Agno v2.0 with architectural updates (~10,000x faster agent creation, ~50x less memory usage)
- Upgraded Agno to version 2.0.5 with enhanced agent features
- Reorganized types module and cleaned duplicates for better structure
- Modernized codebase with enhanced type safety and annotations
- Adopted src layout for Python project structure following best practices
- Optimized code structure and performance across modules

### Fixed
- Resolved mypy type checking errors across all modules
- Comprehensive security and quality improvements
- Updated minimum Agno version to 2.0.5 for compatibility

### Documentation
- Updated CLAUDE.md with Agno v2.0 migration details and corrected commands
- Enhanced guidance for src layout and development requirements
- Improved test documentation and GitHub provider information

## [0.4.1] - 2025-08-06

### Fixed
- app_lifespan function signature for FastMCP compatibility

### Changed
- Restructured main.py with modular architecture for better maintainability

## [0.4.0] - 2025-08-06

### Added
- Support for Kimi K2 model via OpenRouter integration
- Enhanced model provider options and configuration flexibility

### Changed
- CHANGELOG.md following Keep a Changelog standards
- Moved changelog from README.md to dedicated CHANGELOG.md file

## [0.3.0] - 2025-08-01

### Added
- Support for Ollama FULL LOCAL (no API key needed, but requires Ollama installed and running locally)
- Local LLM inference capabilities through Ollama integration
- Enhanced model configuration options for local deployment
- MseeP.ai security assessment badge

### Changed
- Restored DeepSeek as default LLM provider
- Improved package naming and configuration
- Updated dependencies to support local inference
- Enhanced agent memory management (disabled for individual agents)

### Fixed
- Package naming issues in configuration
- Dependency conflicts resolved
- Merge conflicts between branches

## [0.2.3] - 2025-04-22

### Changed
- Updated version alignment in project configuration and lock file

## [0.2.2] - 2025-04-10

### Changed
- Default agent model ID for DeepSeek changed from `deepseek-reasoner` to `deepseek-chat`
- Improved model selection recommendations

## [0.2.1] - 2025-04-10

### Changed
- Model selection recommendations updated in documentation
- Enhanced guidance for coordinator vs specialist model selection

## [0.2.0] - 2025-04-06

### Added
- Major refactoring of sequential thinking team structure
- Enhanced coordination logic
- Improved JSON output format
- LLM configuration and model selection enhancements

### Changed
- Agent model IDs updated for better performance
- Project structure improvements

## [0.1.3] - 2025-04-06

### Changed
- Project entry point script from `main:main` to `main:run`
- Updated documentation for improved user guidance
- Cleaned up dependencies in lock file

## [0.1.0] - 2025-04-06

### Added
- Initial project structure and configuration files
- Multi-Agent System (MAS) architecture using Agno framework
- Sequential thinking tool with coordinated specialist agents
- Support for multiple LLM providers (DeepSeek, Groq, OpenRouter)
- Pydantic validation for robust data integrity
- Integration with external tools (Exa for research)
- Structured logging with file and console output
- Support for thought revisions and branching
- MCP server implementation with FastMCP
- Distributed intelligence across specialized agents

[Unreleased]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.7.0...HEAD
[0.7.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.5.0...v0.7.0
[0.5.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.4.1...v0.5.0
[0.4.1]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.4.0...v0.4.1
[0.4.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.3.0...v0.4.0
[0.3.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.3...v0.3.0
[0.2.3]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.2...v0.2.3
[0.2.2]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.1...v0.2.2
[0.2.1]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.2.0...v0.2.1
[0.2.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.1.3...v0.2.0
[0.1.3]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/compare/v0.1.0...v0.1.3
[0.1.0]: https://github.com/FradSer/mcp-server-mas-sequential-thinking/releases/tag/v0.1.0
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/workflow_executor.py:
--------------------------------------------------------------------------------

```python
"""Workflow execution service for multi-thinking processing.

This service handles the execution of multi-thinking workflows,
managing workflow routing and coordination of processing strategies.
"""

import time
from typing import TYPE_CHECKING, Any

from mcp_server_mas_sequential_thinking.config.constants import PerformanceMetrics
from mcp_server_mas_sequential_thinking.core import SessionMemory, ThoughtData
from mcp_server_mas_sequential_thinking.utils import setup_logging

if TYPE_CHECKING:
    from mcp_server_mas_sequential_thinking.routing import MultiThinkingWorkflowResult

logger = setup_logging()


class WorkflowExecutor:
    """Service responsible for executing multi-thinking workflows."""

    def __init__(self, session: SessionMemory) -> None:
        """Initialize the workflow executor with session memory.

        Args:
            session: The session memory instance for accessing team and context
        """
        self._session = session
        self._agno_router: Any = None
        self._initialize_multi_thinking_workflow()

    def _initialize_multi_thinking_workflow(self) -> None:
        """Initialize multi-thinking workflow router.

        Uses dynamic import to avoid circular dependency issues.
        """
        logger.info("Initializing Multi-Thinking Workflow Router")
        # Dynamic import to avoid circular dependency
        from mcp_server_mas_sequential_thinking.routing import (  # noqa: PLC0415
            MultiThinkingWorkflowRouter,
        )

        self._agno_router = MultiThinkingWorkflowRouter()
        logger.info("✅ Multi-Thinking Workflow Router ready")

    async def execute_workflow(
        self, thought_data: ThoughtData, input_prompt: str, start_time: float
    ) -> tuple[str, "MultiThinkingWorkflowResult", float]:
        """Execute multi-thinking workflow for the given thought.

        Args:
            thought_data: The thought data to process
            input_prompt: The context-aware input prompt
            start_time: Processing start time for metrics

        Returns:
            Tuple of (final_response, workflow_result, total_time)
        """
        # Execute multi-thinking workflow
        workflow_result: MultiThinkingWorkflowResult = (
            await self._agno_router.process_thought_workflow(thought_data, input_prompt)
        )

        total_time = time.time() - start_time

        return workflow_result.content, workflow_result, total_time

    def log_workflow_completion(
        self,
        thought_data: ThoughtData,
        workflow_result: "MultiThinkingWorkflowResult",
        total_time: float,
        final_response: str,
    ) -> None:
        """Log workflow completion with multi-thinking specific metrics.

        Args:
            thought_data: The processed thought data
            workflow_result: The workflow execution result
            total_time: Total processing time
            final_response: The final formatted response
        """
        # Basic completion info
        completion_metrics = {
            f"Thought #{thought_data.thoughtNumber}": "processed successfully",
            "Strategy": workflow_result.strategy_used,
            "Complexity Score": f"{workflow_result.complexity_score:.1f}/100",
            "Step": workflow_result.step_name,
            "Processing time": f"{workflow_result.processing_time:.3f}s",
            "Total time": f"{total_time:.3f}s",
            "Response length": f"{len(final_response)} chars",
        }

        self._log_metrics_block(
            "🧠 MULTI-THINKING WORKFLOW COMPLETION:", completion_metrics
        )
        self._log_separator()

        # Performance metrics
        execution_consistency = self._calculate_execution_consistency(
            workflow_result.strategy_used != "error_fallback"
        )
        efficiency_score = self._calculate_efficiency_score(
            workflow_result.processing_time
        )

        performance_metrics = {
            "Execution Consistency": execution_consistency,
            "Efficiency Score": efficiency_score,
            "Response Length": f"{len(final_response)} chars",
            "Strategy Executed": workflow_result.strategy_used,
        }
        self._log_metrics_block("📊 WORKFLOW PERFORMANCE METRICS:", performance_metrics)

    def _log_metrics_block(self, title: str, metrics: dict[str, Any]) -> None:
        """Log a formatted metrics block.

        Args:
            title: The title for the metrics block
            metrics: Dictionary of metrics to log
        """
        logger.info("%s", title)
        for key, value in metrics.items():
            if isinstance(value, float):
                logger.info("  %s: %.2f", key, value)
            else:
                logger.info("  %s: %s", key, value)

    def _log_separator(self, length: int = 60) -> None:
        """Log a separator line.

        Args:
            length: Length of the separator line
        """
        # Use performance metrics constant
        length = PerformanceMetrics.SEPARATOR_LENGTH
        logger.info("  %s", "=" * length)

    def _calculate_efficiency_score(self, processing_time: float) -> float:
        """Calculate efficiency score using standard metrics.

        Args:
            processing_time: The processing time in seconds

        Returns:
            Efficiency score between 0 and 1
        """
        # Use constants from PerformanceMetrics
        perfect_score = PerformanceMetrics.PERFECT_EFFICIENCY_SCORE
        threshold = PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD
        minimum_score = PerformanceMetrics.MINIMUM_EFFICIENCY_SCORE

        return (
            perfect_score
            if processing_time < threshold
            else max(minimum_score, threshold / processing_time)
        )

    def _calculate_execution_consistency(self, success_indicator: bool) -> float:
        """Calculate execution consistency using standard metrics.

        Args:
            success_indicator: Whether execution was successful

        Returns:
            Execution consistency score
        """
        # Use constants from PerformanceMetrics
        perfect_consistency = PerformanceMetrics.PERFECT_EXECUTION_CONSISTENCY
        default_consistency = PerformanceMetrics.DEFAULT_EXECUTION_CONSISTENCY

        return perfect_consistency if success_indicator else default_consistency

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/infrastructure/logging_config.py:
--------------------------------------------------------------------------------

```python
"""Streamlined logging configuration based on Python best practices.

Replaces complex 985-line implementation with focused, performance-optimized approach.
"""

import logging
import logging.handlers
import os
import sys
from pathlib import Path


def setup_logging(level: str | None = None) -> logging.Logger:
    """Setup streamlined logging with environment-based configuration.

    Args:
        level: Log level override. If None, uses LOG_LEVEL env var or defaults to INFO.

    Returns:
        Configured logger instance for the application.
    """
    # Determine log level from environment or parameter
    log_level = level or os.getenv("LOG_LEVEL", "INFO")

    try:
        numeric_level = getattr(logging, log_level.upper())
    except AttributeError:
        numeric_level = logging.INFO

    # Create logs directory
    log_dir = Path.home() / ".sequential_thinking" / "logs"
    log_dir.mkdir(parents=True, exist_ok=True)

    # Configure root logger for this application
    logger = logging.getLogger("sequential_thinking")
    logger.setLevel(numeric_level)

    # Clear any existing handlers to avoid duplicates
    logger.handlers.clear()

    # Console handler for development/debugging
    if os.getenv("ENVIRONMENT") != "production":
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(numeric_level)
        console_formatter = logging.Formatter(
            "%(asctime)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S"
        )
        console_handler.setFormatter(console_formatter)
        logger.addHandler(console_handler)

    # File handler with rotation for persistent logging
    log_file = log_dir / "sequential_thinking.log"
    file_handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=5 * 1024 * 1024,  # 5MB
        backupCount=3,
        encoding="utf-8",
    )
    file_handler.setLevel(numeric_level)
    file_formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s"
    )
    file_handler.setFormatter(file_formatter)
    logger.addHandler(file_handler)

    # Prevent propagation to root logger to avoid duplicates
    logger.propagate = False

    return logger


def get_logger(name: str | None = None) -> logging.Logger:
    """Get a logger instance with consistent configuration.

    Args:
        name: Logger name. If None, uses calling module's name.

    Returns:
        Logger instance.
    """
    if name is None:
        # Get caller's module name for better traceability
        import inspect

        frame = inspect.currentframe().f_back
        name = frame.f_globals.get("__name__", "sequential_thinking")

    return logging.getLogger(name)


def log_performance_metric(
    logger: logging.Logger, operation: str, duration: float, **kwargs
) -> None:
    """Log performance metrics in consistent format.

    Uses lazy evaluation to avoid string formatting overhead.
    """
    if logger.isEnabledFor(logging.INFO):
        extras = ", ".join(f"{k}={v}" for k, v in kwargs.items()) if kwargs else ""
        logger.info(
            "Performance: %s completed in %.2fs%s",
            operation,
            duration,
            f" ({extras})" if extras else "",
        )


def log_routing_decision(
    logger: logging.Logger, strategy: str, complexity: float, reasoning: str = ""
) -> None:
    """Log AI routing decisions with consistent structure."""
    logger.info(
        "AI Routing: strategy=%s, complexity=%.1f%s",
        strategy,
        complexity,
        f", reason={reasoning}" if reasoning else "",
    )


def log_thought_processing(
    logger: logging.Logger,
    stage: str,
    thought_number: int,
    thought_length: int = 0,
    **context,
) -> None:
    """Log thought processing stages with structured data."""
    if logger.isEnabledFor(logging.INFO):
        ctx_str = ", ".join(f"{k}={v}" for k, v in context.items()) if context else ""
        logger.info(
            "Thought Processing: stage=%s, number=%d, length=%d%s",
            stage,
            thought_number,
            thought_length,
            f", {ctx_str}" if ctx_str else "",
        )


class LogTimer:
    """Context manager for timing operations with automatic logging."""

    def __init__(
        self, logger: logging.Logger, operation: str, level: int = logging.INFO
    ) -> None:
        self.logger = logger
        self.operation = operation
        self.level = level
        self.start_time = None

    def __enter__(self):
        if self.logger.isEnabledFor(logging.DEBUG):
            self.logger.debug("Starting: %s", self.operation)

        import time

        self.start_time = time.time()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        import time

        duration = time.time() - self.start_time

        if exc_type is None:
            if self.logger.isEnabledFor(self.level):
                self.logger.log(
                    self.level, "Completed: %s (%.2fs)", self.operation, duration
                )
        else:
            self.logger.error(
                "Failed: %s (%.2fs) - %s", self.operation, duration, exc_val
            )


# Legacy compatibility - maintain existing function names but with simplified implementation
def create_logger(name: str) -> logging.Logger:
    """Legacy compatibility function."""
    return get_logger(name)


def configure_logging(level: str = "INFO") -> logging.Logger:
    """Legacy compatibility function."""
    return setup_logging(level)


class MetricsLogger:
    """Simple metrics logger for structured logging output."""

    def __init__(self, logger_name: str = "sequential_thinking") -> None:
        """Initialize metrics logger with specified logger name."""
        self.logger = logging.getLogger(logger_name)

    def log_metrics_block(self, title: str, metrics: dict) -> None:
        """Log a block of metrics with a title.

        Args:
            title: Block title to display
            metrics: Dictionary of metrics to log
        """
        if not self.logger.isEnabledFor(logging.INFO):
            return

        self.logger.info("%s", title)
        for key, value in metrics.items():
            self.logger.info("  %s: %s", key, value)

    def log_separator(self, length: int = 60) -> None:
        """Log a separator line.

        Args:
            length: Length of the separator line
        """
        if self.logger.isEnabledFor(logging.INFO):
            self.logger.info("-" * length)

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/response_formatter.py:
--------------------------------------------------------------------------------

```python
"""Response formatting service for thought processing.

This service handles formatting and synthesizing responses from various processing
modes, extracting content from Agno RunOutput objects, and preparing final responses.
"""

from mcp_server_mas_sequential_thinking.core import ThoughtData
from mcp_server_mas_sequential_thinking.utils import setup_logging

logger = setup_logging()


class ResponseFormatter:
    """Service responsible for formatting and synthesizing responses."""

    def __init__(self) -> None:
        """Initialize the response formatter."""

    def format_response(self, content: str, thought_data: ThoughtData) -> str:
        """Format response for MCP - clean content without additional guidance.

        MCP servers should return clean content and let the AI decide next steps.

        Args:
            content: The raw response content to format
            thought_data: The thought data context

        Returns:
            Formatted response string
        """
        # MCP servers should return clean content, let AI decide next steps
        final_response = content

        # Log response formatting details
        self._log_response_formatting(content, thought_data, final_response)

        return final_response

    def extract_response_content(self, response) -> str:
        """Extract clean content from Agno RunOutput objects.

        Handles various response types and extracts text content properly.

        Args:
            response: The response object from Agno processing

        Returns:
            Extracted text content
        """
        # Import ResponseExtractor to handle the extraction
        from mcp_server_mas_sequential_thinking.services.response_processor import (
            ResponseExtractor,
        )

        return ResponseExtractor.extract_content(response)

    def _log_response_formatting(
        self, content: str, thought_data: ThoughtData, final_response: str
    ) -> None:
        """Log response formatting details for debugging and monitoring.

        Args:
            content: The original response content
            thought_data: The thought data context
            final_response: The final formatted response
        """
        logger.info("📤 RESPONSE FORMATTING:")
        logger.info(f"  Original content length: {len(content)} chars")
        logger.info(f"  Next needed: {thought_data.nextThoughtNeeded}")
        logger.info(f"  Final response length: {len(final_response)} chars")
        logger.info(f"  Final response:\n{final_response}")

        # Use field length limits constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import FieldLengthLimits

            separator_length = FieldLengthLimits.SEPARATOR_LENGTH
        except ImportError:
            pass

        logger.info(f"  {'=' * separator_length}")

    def log_input_details(
        self, input_prompt: str, context_description: str = "input"
    ) -> None:
        """Log input details with consistent formatting.

        Args:
            input_prompt: The input prompt to log
            context_description: Description of the context
        """
        logger.info(f"  Input length: {len(input_prompt)} chars")
        logger.info(f"  Full {context_description}:\\n{input_prompt}")

        # Use performance metrics constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import PerformanceMetrics

            separator_length = PerformanceMetrics.SEPARATOR_LENGTH
        except ImportError:
            pass

        logger.info(f"  {'=' * separator_length}")

    def log_output_details(
        self,
        response_content: str,
        processing_time: float,
        context_description: str = "response",
    ) -> None:
        """Log output details with consistent formatting.

        Args:
            response_content: The response content to log
            processing_time: Processing time in seconds
            context_description: Description of the context
        """
        logger.info(f"  Processing time: {processing_time:.3f}s")
        logger.info(f"  Output length: {len(response_content)} chars")
        logger.info(f"  Full {context_description}:\\n{response_content}")

        # Use performance metrics constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import PerformanceMetrics

            separator_length = PerformanceMetrics.SEPARATOR_LENGTH
        except ImportError:
            pass

        logger.info(f"  {'=' * separator_length}")


class ResponseExtractor:
    """Utility class for extracting content from various response types."""

    @staticmethod
    def extract_content(response) -> str:
        """Extract clean content from Agno RunOutput objects.

        This method handles various response formats and extracts the actual
        text content that should be returned to the user.

        Args:
            response: The response object from agent processing

        Returns:
            Extracted text content as string
        """
        # Handle string responses directly
        if isinstance(response, str):
            return response.strip()

        # Handle Agno RunOutput objects
        if hasattr(response, "content"):
            content = response.content
            if isinstance(content, str):
                return content.strip()
            if isinstance(content, list):
                # Handle list of content items
                text_parts = []
                for item in content:
                    if isinstance(item, str):
                        text_parts.append(item)
                    elif hasattr(item, "text"):
                        text_parts.append(item.text)
                    elif hasattr(item, "content"):
                        text_parts.append(str(item.content))
                return "\n".join(text_parts).strip()

        # Handle objects with text attribute
        if hasattr(response, "text"):
            return response.text.strip()

        # Handle objects with message attribute
        if hasattr(response, "message"):
            message = response.message
            if isinstance(message, str):
                return message.strip()
            if hasattr(message, "content"):
                return str(message.content).strip()

        # Fallback: convert to string
        return str(response).strip()

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/thought_processor_refactored.py:
--------------------------------------------------------------------------------

```python
"""Refactored ThoughtProcessor using dependency injection and single responsibility services.

This module provides a clean, maintainable implementation of thought processing
that delegates responsibilities to specialized services following clean architecture.
"""

import time

from mcp_server_mas_sequential_thinking.config import ProcessingDefaults
from mcp_server_mas_sequential_thinking.core import (
    ProcessingMetadata,
    SessionMemory,
    ThoughtData,
    ThoughtProcessingError,
)
from mcp_server_mas_sequential_thinking.infrastructure import (
    MetricsLogger,
)
from mcp_server_mas_sequential_thinking.utils import setup_logging

from .context_builder import ContextBuilder
from .processing_orchestrator import ProcessingOrchestrator
from .response_formatter import ResponseFormatter
from .response_processor import ResponseProcessor
from .retry_handler import TeamProcessingRetryHandler
from .workflow_executor import WorkflowExecutor

logger = setup_logging()


class ThoughtProcessor:
    """Refactored thought processor using dependency injection and clean architecture.

    This class orchestrates thought processing by delegating specific responsibilities
    to specialized services, maintaining a clean separation of concerns.
    """

    __slots__ = (
        "_context_builder",
        "_metrics_logger",
        "_performance_tracker",
        "_processing_orchestrator",
        "_response_formatter",
        "_session",
        "_workflow_executor",
    )

    def __init__(self, session: SessionMemory) -> None:
        """Initialize the thought processor with dependency injection.

        Args:
            session: The session memory instance for accessing team and context
        """
        self._session = session

        # Initialize core services with dependency injection
        self._context_builder = ContextBuilder(session)
        self._workflow_executor = WorkflowExecutor(session)
        self._response_formatter = ResponseFormatter()

        # Initialize supporting services
        response_processor = ResponseProcessor()
        retry_handler = TeamProcessingRetryHandler()
        self._processing_orchestrator = ProcessingOrchestrator(
            session, response_processor, retry_handler
        )

        # Initialize monitoring services
        self._metrics_logger = MetricsLogger()

        logger.info("ThoughtProcessor initialized with specialized services")

    async def process_thought(self, thought_data: ThoughtData) -> str:
        """Process a thought through the appropriate workflow with comprehensive error handling.

        This is the main public API method that maintains backward compatibility
        while using the new service-based architecture internally.

        Args:
            thought_data: The thought data to process

        Returns:
            Processed thought response

        Raises:
            ThoughtProcessingError: If processing fails
        """
        try:
            return await self._process_thought_internal(thought_data)
        except Exception as e:
            error_msg = f"Failed to process {thought_data.thought_type.value} thought #{thought_data.thoughtNumber}: {e}"
            logger.error(error_msg, exc_info=True)
            metadata: ProcessingMetadata = {
                "error_count": ProcessingDefaults.ERROR_COUNT_INITIAL,
                "retry_count": ProcessingDefaults.RETRY_COUNT_INITIAL,
                "processing_time": ProcessingDefaults.PROCESSING_TIME_INITIAL,
            }
            raise ThoughtProcessingError(error_msg, metadata) from e

    async def _process_thought_internal(self, thought_data: ThoughtData) -> str:
        """Internal thought processing logic using specialized services.

        Args:
            thought_data: The thought data to process

        Returns:
            Processed thought response
        """
        start_time = time.time()

        # Log thought data and add to session (now async for thread safety)
        self._log_thought_data(thought_data)
        await self._session.add_thought(thought_data)

        # Build context using specialized service (now async for thread safety)
        input_prompt = await self._context_builder.build_context_prompt(thought_data)
        await self._context_builder.log_context_building(thought_data, input_prompt)

        # Execute Multi-Thinking workflow using specialized service
        (
            content,
            workflow_result,
            total_time,
        ) = await self._workflow_executor.execute_workflow(
            thought_data, input_prompt, start_time
        )

        # Format response using specialized service
        final_response = self._response_formatter.format_response(content, thought_data)

        # Log workflow completion
        self._workflow_executor.log_workflow_completion(
            thought_data, workflow_result, total_time, final_response
        )

        return final_response

    def _log_thought_data(self, thought_data: ThoughtData) -> None:
        """Log comprehensive thought data information using centralized logger.

        Args:
            thought_data: The thought data to log
        """
        basic_info = {
            f"Thought #{thought_data.thoughtNumber}": f"{thought_data.thoughtNumber}/{thought_data.totalThoughts}",
            "Type": thought_data.thought_type.value,
            "Content": thought_data.thought,
            "Next needed": thought_data.nextThoughtNeeded,
            "Needs more": thought_data.needsMoreThoughts,
        }

        # Add conditional fields
        if thought_data.isRevision:
            basic_info["Is revision"] = (
                f"True (revises thought #{thought_data.branchFromThought})"
            )
        if thought_data.branchFromThought:
            basic_info["Branch from"] = (
                f"#{thought_data.branchFromThought} (ID: {thought_data.branchId})"
            )

        basic_info["Raw data"] = thought_data.format_for_log()

        self._metrics_logger.log_metrics_block("🧩 THOUGHT DATA:", basic_info)

        # Use field length limits constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import FieldLengthLimits

            separator_length = FieldLengthLimits.SEPARATOR_LENGTH
        except ImportError:
            pass

        self._metrics_logger.log_separator(separator_length)

    # Legacy methods for backward compatibility - these delegate to orchestrator
    async def _execute_single_agent_processing(
        self, input_prompt: str, routing_decision=None
    ) -> str:
        """Legacy method - delegates to orchestrator for backward compatibility."""
        return await self._processing_orchestrator.execute_single_agent_processing(
            input_prompt, simplified=True
        )

    async def _execute_team_processing(self, input_prompt: str) -> str:
        """Legacy method - delegates to orchestrator for backward compatibility."""
        return await self._processing_orchestrator.execute_team_processing(input_prompt)

    def _extract_response_content(self, response) -> str:
        """Legacy method - delegates to formatter for backward compatibility."""
        return self._response_formatter.extract_response_content(response)

    def _build_context_prompt(self, thought_data: ThoughtData) -> str:
        """Legacy method - delegates to context builder for backward compatibility."""
        return self._context_builder.build_context_prompt(thought_data)

    def _format_response(self, content: str, thought_data: ThoughtData) -> str:
        """Legacy method - delegates to formatter for backward compatibility."""
        return self._response_formatter.format_response(content, thought_data)

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/core/types.py:
--------------------------------------------------------------------------------

```python
"""Type definitions for better type safety."""

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any, Protocol, TypedDict

if TYPE_CHECKING:
    from agno.agent import Agent
    from agno.models.base import Model
    from agno.team.team import Team

    from mcp_server_mas_sequential_thinking.config.modernized_config import ModelConfig
    from mcp_server_mas_sequential_thinking.core.models import ThoughtData

# Type aliases for better semantic meaning
ThoughtNumber = int
BranchId = str
ProviderName = str
TeamType = str
AgentType = str
ConfigDict = dict[str, Any]
InstructionsList = list[str]
SuccessCriteriaList = list[str]


class ExecutionMode(Enum):
    """Execution modes for different routing strategies."""

    SINGLE_AGENT = "single_agent"
    SELECTIVE_TEAM = "selective_team"  # Hybrid with specific specialists
    FULL_TEAM = "full_team"  # Complete multi-agent team


@dataclass
class CoordinationPlan:
    """Comprehensive plan combining routing and coordination decisions."""

    # Routing decisions - using string values for simplicity
    strategy: str  # ProcessingStrategy value
    complexity_level: str  # ComplexityLevel value
    complexity_score: float

    # Coordination decisions
    execution_mode: ExecutionMode
    specialist_roles: list[str]
    task_breakdown: list[str]
    coordination_strategy: str

    # Execution parameters
    timeout_seconds: float
    expected_interactions: int
    team_size: int

    # Reasoning and metadata
    reasoning: str
    confidence: float
    original_thought: str

    @classmethod
    def from_routing_decision(
        cls, routing_decision: dict[str, Any], thought_data: ThoughtData
    ) -> CoordinationPlan:
        """Create coordination plan from adaptive routing decision."""
        # Map ProcessingStrategy to ExecutionMode
        strategy_to_mode = {
            "single_agent": ExecutionMode.SINGLE_AGENT,
            "hybrid": ExecutionMode.SELECTIVE_TEAM,
            "multi_agent": ExecutionMode.FULL_TEAM,
        }

        # Map complexity to specialist roles
        complexity_to_specialists: dict[str, list[str]] = {
            "simple": ["general"],
            "moderate": ["planner", "analyzer"],
            "complex": ["planner", "researcher", "analyzer", "critic"],
            "highly_complex": [
                "planner",
                "researcher",
                "analyzer",
                "critic",
                "synthesizer",
            ],
        }

        execution_mode = strategy_to_mode.get(
            routing_decision.strategy.value, ExecutionMode.SINGLE_AGENT
        )
        specialists = complexity_to_specialists.get(
            routing_decision.complexity_level.value, ["general"]
        )

        return cls(
            strategy=routing_decision.strategy.value,
            complexity_level=routing_decision.complexity_level.value,
            complexity_score=routing_decision.complexity_score,
            execution_mode=execution_mode,
            specialist_roles=specialists,
            team_size=len(specialists),
            coordination_strategy="adaptive_routing_based",
            task_breakdown=[
                f"Process {thought_data.thought_type.value} thought",
                "Generate guidance",
            ],
            expected_interactions=len(specialists),
            timeout_seconds=300.0,  # Default timeout
            reasoning=routing_decision.reasoning,
            confidence=0.8,  # Default confidence for rule-based routing
            original_thought=thought_data.thought,
        )


class ProcessingMetadata(TypedDict, total=False):
    """Type-safe processing metadata structure."""

    strategy: str
    complexity_score: float
    estimated_cost: float
    actual_cost: float
    token_usage: int
    processing_time: float
    specialists: list[str]
    provider: str
    routing_reasoning: str
    error_count: int
    retry_count: int


class SessionStats(TypedDict, total=False):
    """Type-safe session statistics structure."""

    total_thoughts: int
    total_cost: float
    total_tokens: int
    average_processing_time: float
    error_rate: float
    successful_thoughts: int
    failed_thoughts: int


class ComplexityMetrics(TypedDict):
    """Type-safe complexity analysis metrics."""

    word_count: int
    sentence_count: int
    question_count: int
    technical_terms: int
    has_branching: bool
    has_research_keywords: bool
    has_analysis_keywords: bool
    overall_score: float


class ModelProvider(Protocol):
    """Protocol for model provider implementations."""

    id: str
    cost_per_token: float


class AgentFactory(Protocol):
    """Protocol for agent factory implementations."""

    def create_team_agents(self, model: Model, team_type: str) -> dict[str, Agent]:
        """Create team agents with specified model and team type."""
        ...


class TeamBuilder(Protocol):
    """Protocol for team builder implementations."""

    def build_team(self, config: ConfigDict, agent_factory: AgentFactory) -> Team:
        """Build a team with specified configuration and agent factory."""
        ...


class CostEstimator(Protocol):
    """Protocol for cost estimation with type safety."""

    def estimate_cost(
        self, strategy: str, complexity_score: float, provider: str
    ) -> tuple[tuple[int, int], float]:
        """Estimate cost for processing strategy."""
        ...


class ComplexityAnalyzer(Protocol):
    """Protocol for complexity analysis with type safety."""

    def analyze(self, thought_text: str) -> ComplexityMetrics:
        """Analyze thought complexity and return metrics."""
        ...


class ThoughtProcessor(Protocol):
    """Protocol for thought processing with type safety."""

    async def process_thought(self, thought_data: ThoughtData) -> str:
        """Process a thought and return the result."""
        ...


class SessionManager(Protocol):
    """Protocol for session management with type safety."""

    def add_thought(self, thought_data: ThoughtData) -> None:
        """Add a thought to the session."""
        ...

    def find_thought_content(self, thought_number: int) -> str:
        """Find thought content by number."""
        ...

    def get_branch_summary(self) -> dict[str, int]:
        """Get summary of all branches."""
        ...


class ConfigurationProvider(Protocol):
    """Protocol for configuration management with type safety."""

    def get_model_config(self, provider_name: str | None = None) -> ModelConfig:
        """Get model configuration."""
        ...

    def check_required_api_keys(self, provider_name: str | None = None) -> list[str]:
        """Check for required API keys."""
        ...


# Custom Exception Classes
class ValidationError(ValueError):
    """Exception raised when data validation fails."""


class ConfigurationError(Exception):
    """Exception raised when configuration is invalid."""


class ThoughtProcessingError(Exception):
    """Exception raised when thought processing fails."""

    def __init__(
        self, message: str, metadata: ProcessingMetadata | None = None
    ) -> None:
        super().__init__(message)
        self.metadata: ProcessingMetadata = metadata or {}


class TeamCreationError(Exception):
    """Exception raised when team creation fails."""


class RoutingDecisionError(ThoughtProcessingError):
    """Error in adaptive routing decision making."""


class CostOptimizationError(ThoughtProcessingError):
    """Error in cost optimization logic."""


class PersistentStorageError(ThoughtProcessingError):
    """Error in persistent memory storage."""


class ModelConfigurationError(ConfigurationError):
    """Error in model configuration."""


class ProviderError(Exception):
    """Error related to LLM providers."""


class AgentCreationError(Exception):
    """Error in agent creation."""

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/main.py:
--------------------------------------------------------------------------------

```python
"""Refactored MCP Sequential Thinking Server with separated concerns and reduced complexity."""

import asyncio
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from html import escape

from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from pydantic import ValidationError

from .config import ProcessingDefaults, SecurityConstants, ValidationLimits
from .core import ThoughtProcessingError

# Import refactored modules
from .services import (
    ServerConfig,
    ServerState,
    ThoughtProcessor,
    create_server_lifespan,
    create_validated_thought_data,
)
from .utils import setup_logging

# Initialize environment and logging
load_dotenv()
logger = setup_logging()

# Global server state with thread safety
_server_state: ServerState | None = None
_thought_processor: ThoughtProcessor | None = None
_processor_lock = asyncio.Lock()


@asynccontextmanager
async def app_lifespan(app) -> AsyncIterator[None]:
    """Simplified application lifespan using refactored server core."""
    global _server_state, _thought_processor

    logger.info("Starting Sequential Thinking Server")

    async with create_server_lifespan() as server_state:
        _server_state = server_state
        logger.info("Server ready for requests")
        yield

    _server_state = None
    _thought_processor = None
    logger.info("Server stopped")


# Initialize FastMCP with lifespan
mcp = FastMCP(lifespan=app_lifespan)


def sanitize_and_validate_input(text: str, max_length: int, field_name: str) -> str:
    """Sanitize and validate input with comprehensive security checks."""
    # Early validation with guard clause
    if not text or not text.strip():
        raise ValueError(f"{field_name} cannot be empty")

    # Strip and normalize whitespace first
    text = text.strip()

    # Check for potential prompt injection patterns using centralized constants
    text_lower = text.lower()
    for pattern in SecurityConstants.INJECTION_PATTERNS:
        if pattern in text_lower:
            raise ValueError(
                f"Potential security risk detected in {field_name}. "
                f"Input contains suspicious pattern: '{pattern}'"
            )

    # Additional security checks
    if text.count('"') > 10 or text.count("'") > 10:
        raise ValueError(
            f"Excessive quotation marks detected in {field_name}. "
            "This may indicate an injection attempt."
        )

    # Sanitize HTML entities and special characters
    sanitized_text = escape(text)

    # Length validation with descriptive error
    if len(sanitized_text) > max_length:
        raise ValueError(
            f"{field_name} exceeds maximum length of {max_length} characters"
        )

    return sanitized_text


@mcp.prompt("sequential-thinking")
def sequential_thinking_prompt(problem: str, context: str = "") -> list[dict]:
    """Enhanced starter prompt for sequential thinking with better formatting."""
    # Sanitize and validate inputs
    try:
        problem = sanitize_and_validate_input(
            problem, ValidationLimits.MAX_PROBLEM_LENGTH, "Problem statement"
        )
        context = (
            sanitize_and_validate_input(
                context, ValidationLimits.MAX_CONTEXT_LENGTH, "Context"
            )
            if context
            else ""
        )
    except ValueError as e:
        raise ValueError(f"Input validation failed: {e}")

    user_prompt = f"""Initiate sequential thinking for: {problem}
{f"Context: {context}" if context else ""}"""

    assistant_guide = f"""Starting sequential thinking process for: {problem}

Process Guidelines:
1. Estimate appropriate number of total thoughts based on problem complexity
2. Begin with: "Plan comprehensive analysis for: {problem}"
3. Use revisions (isRevision=True) to improve previous thoughts
4. Use branching (branchFromThought, branchId) for alternative approaches
5. Each thought should be detailed with clear reasoning
6. Progress systematically through analysis phases

System Architecture:
- Multi-Thinking methodology with intelligent routing
- Factual, Emotional, Critical, Optimistic, Creative, and Synthesis perspectives
- Adaptive thinking sequence based on thought complexity and type
- Comprehensive integration through Synthesis thinking

Ready to begin systematic analysis."""

    return [
        {
            "description": "Enhanced sequential thinking starter with comprehensive guidelines",
            "messages": [
                {"role": "user", "content": {"type": "text", "text": user_prompt}},
                {
                    "role": "assistant",
                    "content": {"type": "text", "text": assistant_guide},
                },
            ],
        }
    ]


@mcp.tool()
async def sequentialthinking(
    thought: str,
    thoughtNumber: int,
    totalThoughts: int,
    nextThoughtNeeded: bool,
    isRevision: bool,
    branchFromThought: int | None,
    branchId: str | None,
    needsMoreThoughts: bool,
) -> str:
    """Advanced sequential thinking tool with multi-agent coordination.

    Processes thoughts through a specialized team of AI agents that coordinate
    to provide comprehensive analysis, planning, research, critique, and synthesis.

    Args:
        thought: Content of the thinking step (required)
        thoughtNumber: Sequence number starting from {ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE} (≥{ThoughtProcessingLimits.MIN_THOUGHT_SEQUENCE})
        totalThoughts: Estimated total thoughts required (≥1)
        nextThoughtNeeded: Whether another thought step follows this one
        isRevision: Whether this thought revises a previous thought
        branchFromThought: Thought number to branch from for alternative exploration
        branchId: Unique identifier for the branch (required if branchFromThought set)
        needsMoreThoughts: Whether more thoughts are needed beyond the initial estimate

    Returns:
        Synthesized response from the multi-agent team with guidance for next steps

    Raises:
        ProcessingError: When thought processing fails
        ValidationError: When input validation fails
        RuntimeError: When server state is invalid
    """
    # Capture server state locally to avoid async race conditions
    current_server_state = _server_state
    if current_server_state is None:
        return "Server Error: Server not initialized"

    try:
        # Create and validate thought data using refactored function
        thought_data = create_validated_thought_data(
            thought=thought,
            thoughtNumber=thoughtNumber,
            totalThoughts=totalThoughts,
            nextThoughtNeeded=nextThoughtNeeded,
            isRevision=isRevision,
            branchFromThought=branchFromThought,
            branchId=branchId,
            needsMoreThoughts=needsMoreThoughts,
        )

        # Use captured state directly to avoid race conditions
        global _thought_processor
        async with _processor_lock:
            if _thought_processor is None:
                logger.info(
                    "Initializing ThoughtProcessor with Multi-Thinking workflow"
                )
                _thought_processor = ThoughtProcessor(current_server_state.session)

        result = await _thought_processor.process_thought(thought_data)

        logger.info(f"Successfully processed thought #{thoughtNumber}")
        return result

    except ValidationError as e:
        error_msg = f"Input validation failed for thought #{thoughtNumber}: {e}"
        logger.exception(error_msg)
        return f"Validation Error: {e}"

    except ThoughtProcessingError as e:
        error_msg = f"Processing failed for thought #{thoughtNumber}: {e}"
        logger.exception(error_msg)
        if hasattr(e, "metadata") and e.metadata:
            logger.exception(f"Error metadata: {e.metadata}")
        return f"Processing Error: {e}"

    except Exception as e:
        error_msg = f"Unexpected error processing thought #{thoughtNumber}: {e}"
        logger.exception(error_msg)
        return f"Unexpected Error: {e}"


def run() -> None:
    """Run the MCP server with enhanced error handling and graceful shutdown."""
    config = ServerConfig.from_environment()
    logger.info(f"Starting Sequential Thinking Server with {config.provider} provider")

    try:
        # Run server with stdio transport
        mcp.run(transport="stdio")

    except KeyboardInterrupt:
        logger.info("Server stopped by user (SIGINT)")

    except SystemExit as e:
        logger.info(f"Server stopped with exit code: {e.code}")
        raise

    except Exception as e:
        logger.error(f"Critical server error: {e}", exc_info=True)
        sys.exit(ProcessingDefaults.EXIT_CODE_ERROR)

    finally:
        logger.info("Server shutdown sequence complete")


def main() -> None:
    """Main entry point with proper error handling."""
    try:
        run()
    except Exception as e:
        logger.critical(f"Fatal error in main: {e}", exc_info=True)
        sys.exit(ProcessingDefaults.EXIT_CODE_ERROR)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/config/constants.py:
--------------------------------------------------------------------------------

```python
"""Constants for the MCP Sequential Thinking Server."""

from enum import Enum
from typing import ClassVar


class TokenCosts:
    """Token cost constants for different providers (cost per 1000 tokens)."""

    DEEPSEEK_COST_PER_1K = 0.0002
    GROQ_COST_PER_1K = 0.0001
    OPENROUTER_COST_PER_1K = 0.001
    GITHUB_COST_PER_1K = 0.0005
    OLLAMA_COST_PER_1K = 0.0000
    DEFAULT_COST_PER_1K = 0.0002


class ComplexityScoring:
    """Complexity analysis scoring constants."""

    MAX_SCORE = 100.0
    WORD_COUNT_MAX_POINTS = 15
    WORD_COUNT_DIVISOR = 20
    SENTENCE_MULTIPLIER = 2
    SENTENCE_MAX_POINTS = 10
    QUESTION_MULTIPLIER = 3
    QUESTION_MAX_POINTS = 15
    TECHNICAL_TERM_MULTIPLIER = 2
    TECHNICAL_TERM_MAX_POINTS = 20
    BRANCHING_MULTIPLIER = 5
    BRANCHING_MAX_POINTS = 15
    RESEARCH_MULTIPLIER = 3
    RESEARCH_MAX_POINTS = 15
    ANALYSIS_MULTIPLIER = 2
    ANALYSIS_MAX_POINTS = 10


class TokenEstimates:
    """Token estimation ranges by complexity and strategy."""

    # Single agent token estimates (min, max)
    SINGLE_AGENT_SIMPLE = (400, 800)
    SINGLE_AGENT_MODERATE = (600, 1200)
    SINGLE_AGENT_COMPLEX = (800, 1600)
    SINGLE_AGENT_HIGHLY_COMPLEX = (1000, 2000)

    # Multi-agent token estimates (min, max)
    MULTI_AGENT_SIMPLE = (2000, 4000)
    MULTI_AGENT_MODERATE = (3000, 6000)
    MULTI_AGENT_COMPLEX = (4000, 8000)
    MULTI_AGENT_HIGHLY_COMPLEX = (5000, 10000)


class ValidationLimits:
    """Input validation and system limits."""

    MAX_PROBLEM_LENGTH = 500
    MAX_CONTEXT_LENGTH = 300
    MAX_THOUGHTS_PER_SESSION = 1000
    MAX_BRANCHES_PER_SESSION = 50
    MAX_THOUGHTS_PER_BRANCH = 100
    GITHUB_TOKEN_LENGTH = 40
    MIN_THOUGHT_NUMBER = 1


class DefaultTimeouts:
    """Default timeout values in seconds."""

    PROCESSING_TIMEOUT = 30.0
    DEEPSEEK_PROCESSING_TIMEOUT = 120.0  # Legacy timeout for Deepseek (deprecated)
    MULTI_AGENT_TIMEOUT_MULTIPLIER = 2.0  # Multiply timeout for multi-agent

    # Adaptive timeout strategy (v0.5.1+)
    ADAPTIVE_BASE_DEEPSEEK = 90.0  # Base timeout for Deepseek with adaptive scaling
    ADAPTIVE_BASE_GROQ = 20.0  # Base timeout for Groq
    ADAPTIVE_BASE_OPENAI = 45.0  # Base timeout for OpenAI
    ADAPTIVE_BASE_DEFAULT = 30.0  # Default base timeout

    # Maximum timeouts (safety ceiling)
    MAX_TIMEOUT_DEEPSEEK = 300.0  # 5 minutes absolute maximum for Deepseek
    MAX_TIMEOUT_DEFAULT = 180.0  # 3 minutes maximum for others

    # Complexity multipliers for adaptive timeouts
    COMPLEXITY_SIMPLE_MULTIPLIER = 1.0
    COMPLEXITY_MODERATE_MULTIPLIER = 1.5
    COMPLEXITY_COMPLEX_MULTIPLIER = 2.0
    COMPLEXITY_HIGHLY_COMPLEX_MULTIPLIER = 3.0

    # Retry configuration
    RETRY_EXPONENTIAL_BASE = 1.5  # Exponential backoff base
    MAX_RETRY_ATTEMPTS = 2  # Maximum retry attempts

    SESSION_CLEANUP_DAYS = 30
    RECENT_SESSION_KEEP_COUNT = 100


class LoggingLimits:
    """Logging configuration constants."""

    LOG_FILE_MAX_BYTES = 5 * 1024 * 1024  # 5MB
    LOG_BACKUP_COUNT = 3
    SENSITIVE_DATA_MIN_LENGTH = 8


class QualityThresholds:
    """Quality scoring and budget utilization thresholds."""

    DEFAULT_QUALITY_THRESHOLD = 0.7
    HIGH_BUDGET_UTILIZATION = 0.8
    VERY_HIGH_BUDGET_UTILIZATION = 0.9
    MULTI_AGENT_HIGH_USAGE = 0.7
    SINGLE_AGENT_HIGH_USAGE = 0.8
    MINIMUM_USAGE_FOR_SUGGESTIONS = 10
    SIGNIFICANT_COST_THRESHOLD = 0.01


class ProviderDefaults:
    """Default provider configurations."""

    DEFAULT_QUALITY_SCORE = 0.8
    DEFAULT_RESPONSE_TIME = 2.0
    DEFAULT_UPTIME_SCORE = 0.95
    DEFAULT_ERROR_RATE = 0.05
    DEFAULT_CONTEXT_LENGTH = 4096


class ComplexityThresholds:
    """Complexity level thresholds for scoring."""

    SIMPLE_MAX = 25.0
    MODERATE_MAX = 50.0
    COMPLEX_MAX = 75.0
    # HIGHLY_COMPLEX is anything above COMPLEX_MAX


class DefaultValues:
    """Default configuration values."""

    DEFAULT_LLM_PROVIDER = "deepseek"
    DEFAULT_TEAM_MODE = "standard"
    DEFAULT_LOG_LEVEL = "INFO"
    DEFAULT_MAX_RETRIES = 3
    DEFAULT_TIMEOUT = 30.0


class PerformanceMetrics:
    """Performance measurement constants."""

    # Efficiency calculation thresholds
    EFFICIENCY_TIME_THRESHOLD = 60.0  # seconds
    PERFECT_EXECUTION_CONSISTENCY = 1.0
    DEFAULT_EXECUTION_CONSISTENCY = 0.9
    PERFECT_EFFICIENCY_SCORE = 1.0
    MINIMUM_EFFICIENCY_SCORE = 0.5

    # Retry and sleep constants
    RETRY_SLEEP_DURATION = 1.0  # seconds

    # Logging formatting
    SEPARATOR_LENGTH = 50


class ProcessingDefaults:
    """Default values for thought processing."""

    ERROR_COUNT_INITIAL = 1
    RETRY_COUNT_INITIAL = 0
    PROCESSING_TIME_INITIAL = 0.0
    TEAM_INITIALIZER_INDEX = 1
    SINGLE_AGENT_TIMEOUT_MULTIPLIER = 0.5
    EXIT_CODE_ERROR = 1


class FieldLengthLimits:
    """Field length limits for various inputs."""

    MIN_STRING_LENGTH = 1
    MAX_STANDARD_STRING = 2000
    MAX_DESCRIPTION_LENGTH = 1000
    MAX_BRANCH_ID_LENGTH = 100
    SEPARATOR_LENGTH = 50


class DatabaseConstants:
    """Database configuration constants."""

    SESSION_CLEANUP_BATCH_SIZE = 100
    THOUGHT_QUERY_LIMIT = 1000
    CONNECTION_POOL_SIZE = 5
    CONNECTION_POOL_OVERFLOW = 10


class ThoughtProcessingLimits:
    """Limits for thought processing workflow."""

    MIN_THOUGHT_SEQUENCE = 1
    MAX_TEAM_DELEGATION_COUNT = 2
    ANALYSIS_TIME_LIMIT_SECONDS = 5
    MIN_PROCESSING_STEPS = 1
    MAX_PROCESSING_STEPS = 6


class TechnicalTerms:
    """Technical terms for complexity analysis."""

    KEYWORDS: ClassVar[list[str]] = [
        "algorithm",
        "data",
        "analysis",
        "system",
        "process",
        "design",
        "implementation",
        "architecture",
        "framework",
        "model",
        "structure",
        "optimization",
        "performance",
        "scalability",
        "integration",
        "api",
        "database",
        "security",
        "authentication",
        "authorization",
        "testing",
        "deployment",
        "configuration",
        "monitoring",
        "logging",
        "debugging",
        "refactoring",
        "migration",
        "synchronization",
        "caching",
        "protocol",
        "interface",
        "inheritance",
        "polymorphism",
        "abstraction",
        "encapsulation",
    ]


class DefaultSettings:
    """Default application settings."""

    DEFAULT_PROVIDER = "deepseek"
    DEFAULT_COMPLEXITY_THRESHOLD = 30.0
    DEFAULT_TOKEN_BUFFER = 0.2
    DEFAULT_SESSION_TIMEOUT = 3600


class CostOptimizationConstants:
    """Constants for cost optimization calculations."""

    # Quality scoring weights
    QUALITY_WEIGHT = 0.4
    COST_WEIGHT = 0.3
    SPEED_WEIGHT = 0.2
    RELIABILITY_WEIGHT = 0.1

    # Cost calculation factors
    COST_NORMALIZATION_FACTOR = 0.0003
    COST_EPSILON = 0.0001  # Prevent division by zero
    DEFAULT_COST_ESTIMATE = 0.0002
    SPEED_NORMALIZATION_BASE = 10
    SPEED_THRESHOLD = 1

    # Quality scoring bounds
    MIN_QUALITY_SCORE = 0.0
    MAX_QUALITY_SCORE = 1.0

    # Budget utilization thresholds
    HIGH_BUDGET_UTILIZATION = 0.8
    MODERATE_BUDGET_UTILIZATION = 0.7
    CRITICAL_BUDGET_UTILIZATION = 0.9

    # Complexity bonuses
    QUALITY_COMPLEXITY_BONUS = 0.2
    COST_COMPLEXITY_BONUS = 0.0001

    # Provider optimization
    HIGH_USAGE_PENALTY = 2.0
    MODERATE_USAGE_PENALTY = 0.5
    QUALITY_UPDATE_WEIGHT = 0.1
    OLD_QUALITY_WEIGHT = 0.9

    # Usage analysis thresholds
    MIN_DATA_THRESHOLD = 10
    HIGH_MULTI_AGENT_RATIO = 0.7
    HIGH_SINGLE_AGENT_RATIO = 0.8
    MINIMUM_COST_DIFFERENCE = 0.01

    # Provider-specific configurations
    GROQ_RATE_LIMIT = 14400
    GROQ_CONTEXT_LENGTH = 32768
    GROQ_QUALITY_SCORE = 0.75
    GROQ_RESPONSE_TIME = 0.8

    DEEPSEEK_QUALITY_SCORE = 0.85
    DEEPSEEK_CONTEXT_LENGTH = 128000

    GITHUB_CONTEXT_LENGTH = 128000

    OPENROUTER_RESPONSE_TIME = 3.0
    OPENROUTER_CONTEXT_LENGTH = 200000

    OLLAMA_QUALITY_SCORE = 0.70
    OLLAMA_RESPONSE_TIME = 5.0
    OLLAMA_CONTEXT_LENGTH = 8192


class ComplexityAnalysisConstants:
    """Constants for complexity analysis calculations."""

    # Multilingual text analysis requires different tokenization strategies
    CHINESE_WORD_RATIO = 2  # Character-based scripts need different word boundaries
    CHINESE_DOMINANCE_THRESHOLD = 0.3  # Script detection threshold for optimization

    # Complexity scoring weights (extracted from adaptive_routing.py)
    WORD_COUNT_WEIGHT = 0.15
    SENTENCE_WEIGHT = 0.10
    QUESTION_WEIGHT = 0.15
    TECHNICAL_TERM_WEIGHT = 0.20
    BRANCHING_WEIGHT = 0.15
    RESEARCH_WEIGHT = 0.15
    ANALYSIS_DEPTH_WEIGHT = 0.10

    # Complexity level thresholds
    SIMPLE_THRESHOLD = 25.0
    MODERATE_THRESHOLD = 50.0
    COMPLEX_THRESHOLD = 75.0

    # Branching bonus for actual branch context
    BRANCHING_CONTEXT_BONUS = 2

    # Text analysis limits
    MAX_PREVIEW_LENGTH = 200
    MIN_SENTENCE_LENGTH = 3


class SecurityConstants:
    """Security-related constants for input validation."""

    # Patterns that indicate potential prompt injection attempts
    INJECTION_PATTERNS: ClassVar[list[str]] = [
        # System/role instruction injections
        "system:",
        "user:",
        "assistant:",
        "role:",
        # Prompt escape attempts
        "ignore previous",
        "ignore all",
        "disregard",
        # Code execution attempts
        "```python",
        "```bash",
        "exec(",
        "eval(",
        "__import__",
        # Instruction manipulation
        "new instructions",
        "override",
        "instead of",
        # Data extraction attempts
        "print(",
        "console.log",
        "alert(",
        "document.cookie",
    ]


class ProcessingStrategy(Enum):
    """Processing strategy enumeration."""

    SINGLE_AGENT = "single_agent"
    MULTI_AGENT = "multi_agent"
    ADAPTIVE = "adaptive"

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/routing/ai_complexity_analyzer.py:
--------------------------------------------------------------------------------

```python
"""AI-Powered Complexity Analyzer.

Uses an AI agent to intelligently assess thought complexity, replacing the rule-based approach
with more nuanced understanding of context, semantics, and depth.
"""

import json
import logging
from typing import TYPE_CHECKING, Any

from agno.agent import Agent

from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config

from .complexity_types import ComplexityAnalyzer, ComplexityMetrics

if TYPE_CHECKING:
    from mcp_server_mas_sequential_thinking.core.models import ThoughtData

logger = logging.getLogger(__name__)


COMPLEXITY_ANALYSIS_PROMPT = """
You are an expert complexity analyzer for thought processing systems. Your task is to analyze the cognitive complexity of a given thought and return a structured assessment.

Analyze the following thought and provide complexity metrics:

**Thought to Analyze:** "{thought}"

**Instructions:**
1. Consider semantic depth, philosophical implications, conceptual complexity
2. Evaluate required cognitive resources (memory, reasoning, creativity)
3. Assess multi-dimensional thinking requirements
4. Consider cultural and linguistic nuances across different languages

**Response Format:** Return ONLY a valid JSON object with these exact fields:
```json
{{
    "complexity_score": <float 0-100>,
    "word_count": <int>,
    "sentence_count": <int>,
    "question_count": <int>,
    "technical_terms": <int>,
    "branching_references": <int>,
    "research_indicators": <int>,
    "analysis_depth": <int>,
    "philosophical_depth_boost": <int 0-15>,
    "primary_problem_type": "<FACTUAL|EMOTIONAL|CRITICAL|OPTIMISTIC|CREATIVE|SYNTHESIS|EVALUATIVE|PHILOSOPHICAL|DECISION>",
    "thinking_modes_needed": ["<list of required thinking modes>"],
    "reasoning": "<brief explanation of scoring and problem type analysis>"
}}
```

**Scoring Guidelines:**
- 0-10: Simple factual questions or basic statements
- 11-25: Moderate complexity, requires some analysis
- 26-50: Complex topics requiring deep thinking
- 51-75: Highly complex, multi-faceted problems
- 76-100: Extremely complex philosophical/existential questions

**Problem Type Analysis:**
- FACTUAL: Information seeking, definitions, statistics (what, when, where, who)
- EMOTIONAL: Feelings, intuition, personal experiences (feel, sense, worry)
- CRITICAL: Risk assessment, problems, disadvantages (issue, risk, wrong)
- OPTIMISTIC: Benefits, opportunities, positive aspects (good, benefit, advantage)
- CREATIVE: Innovation, alternatives, new ideas (creative, brainstorm, imagine)
- SYNTHESIS: Integration, summary, holistic view (combine, overall, strategy)
- EVALUATIVE: Comparison, assessment, judgment (compare, evaluate, best)
- PHILOSOPHICAL: Meaning, existence, values (purpose, meaning, ethics)
- DECISION: Choice making, selection, recommendations (decide, choose, should)

**Thinking Modes Needed:**
Select appropriate modes based on problem characteristics:
- FACTUAL thinking for information gathering
- EMOTIONAL thinking for intuitive insights
- CRITICAL thinking for risk analysis
- OPTIMISTIC thinking for opportunity identification
- CREATIVE thinking for innovation
- SYNTHESIS thinking for integration

**Special Considerations:**
- Philosophical questions like "Why do we live if we die?" should score 40-70+
- Short but profound questions can have high complexity
- Consider emotional and existential weight, not just length
- Multilingual philosophical concepts preserve cultural context

Analyze now:
"""


class AIComplexityAnalyzer(ComplexityAnalyzer):
    """AI-powered complexity analyzer using language models."""

    def __init__(self, model_config: Any | None = None) -> None:
        self.model_config = model_config or get_model_config()
        self._agent: Agent | None = None

    def _get_agent(self) -> Agent:
        """Lazy initialization of the analysis agent."""
        if self._agent is None:
            model = self.model_config.create_agent_model()
            self._agent = Agent(
                name="ComplexityAnalyzer",
                model=model,
                introduction="You are an expert in cognitive complexity assessment, specializing in philosophy and deep thinking analysis.",
            )
        return self._agent

    async def analyze(self, thought_data: "ThoughtData") -> ComplexityMetrics:
        """Analyze thought complexity using AI agent."""
        logger.info("🤖 AI COMPLEXITY ANALYSIS:")
        logger.info(f"  📝 Analyzing: {thought_data.thought[:100]}...")

        try:
            agent = self._get_agent()
            prompt = COMPLEXITY_ANALYSIS_PROMPT.format(thought=thought_data.thought)

            # Get AI analysis
            result = await agent.arun(input=prompt)

            # Extract JSON response
            response_text = self._extract_response_content(result)
            complexity_data = self._parse_json_response(response_text)

            # Create metrics object with AI assessment
            metrics = ComplexityMetrics(
                complexity_score=complexity_data.get("complexity_score", 0.0),
                word_count=complexity_data.get("word_count", 0),
                sentence_count=complexity_data.get("sentence_count", 0),
                question_count=complexity_data.get("question_count", 0),
                technical_terms=complexity_data.get("technical_terms", 0),
                branching_references=complexity_data.get("branching_references", 0),
                research_indicators=complexity_data.get("research_indicators", 0),
                analysis_depth=complexity_data.get("analysis_depth", 0),
                philosophical_depth_boost=complexity_data.get(
                    "philosophical_depth_boost", 0
                ),
                # AI Analysis Results (critical for routing)
                primary_problem_type=complexity_data.get(
                    "primary_problem_type", "GENERAL"
                ),
                thinking_modes_needed=complexity_data.get(
                    "thinking_modes_needed", ["SYNTHESIS"]
                ),
                analyzer_type="ai",
                reasoning=complexity_data.get("reasoning", "AI analysis"),
            )

            logger.info(f"  🎯 AI Complexity Score: {metrics.complexity_score:.1f}/100")
            logger.info(
                f"  💭 Reasoning: {complexity_data.get('reasoning', 'No reasoning provided')[:100]}..."
            )

            return metrics

        except Exception as e:
            logger.exception(f"❌ AI complexity analysis failed: {e}")
            # Fallback to basic analysis
            return self._basic_fallback_analysis(thought_data)

    def _extract_response_content(self, result: Any) -> str:
        """Extract content from agent response."""
        if hasattr(result, "content"):
            return str(result.content)
        return str(result)

    def _parse_json_response(self, response_text: str) -> dict[str, Any]:
        """Parse JSON from AI response, handling various formats."""
        # Try to find JSON in the response
        lines = response_text.strip().split("\n")

        for line in lines:
            line = line.strip()
            if line.startswith("{") and line.endswith("}"):
                try:
                    return json.loads(line)
                except json.JSONDecodeError:
                    continue

        # Try to extract JSON from code blocks
        if "```json" in response_text:
            start = response_text.find("```json") + 7
            end = response_text.find("```", start)
            if end > start:
                json_text = response_text[start:end].strip()
                try:
                    return json.loads(json_text)
                except json.JSONDecodeError:
                    pass

        # Try parsing the entire response as JSON
        try:
            return json.loads(response_text)
        except json.JSONDecodeError:
            logger.warning(
                f"Failed to parse AI response as JSON: {response_text[:200]}"
            )
            raise ValueError("Could not parse AI complexity analysis response")

    def _basic_fallback_analysis(
        self, thought_data: "ThoughtData"
    ) -> ComplexityMetrics:
        """Fallback to basic analysis if AI fails."""
        logger.warning("🔄 Falling back to basic complexity analysis")

        text = thought_data.thought.lower()

        # Basic metrics
        words = len(text.split())
        sentences = len([s for s in text.split(".") if s.strip()])
        questions = text.count("?") + text.count("?")

        # Simple heuristics
        philosophical_terms = [
            "意义",
            "存在",
            "生命",
            "死亡",
            "为什么",
            "why",
            "meaning",
            "life",
            "death",
        ]
        philosophical_count = sum(1 for term in philosophical_terms if term in text)

        # Basic scoring
        base_score = min(words * 2 + questions * 5 + philosophical_count * 10, 100)

        return ComplexityMetrics(
            complexity_score=base_score,
            word_count=words,
            sentence_count=max(sentences, 1),
            question_count=questions,
            technical_terms=philosophical_count,
            branching_references=0,
            research_indicators=0,
            analysis_depth=philosophical_count,
            philosophical_depth_boost=min(philosophical_count * 5, 15),
            # Basic AI analysis results for fallback
            primary_problem_type="PHILOSOPHICAL"
            if philosophical_count > 0
            else "GENERAL",
            thinking_modes_needed=["SYNTHESIS", "CREATIVE"]
            if philosophical_count > 2
            else ["FACTUAL"],
            analyzer_type="basic_fallback",
            reasoning="Fallback analysis due to AI failure",
        )


# No more monkey patching needed - complexity_score is now a direct field


def create_ai_complexity_analyzer() -> AIComplexityAnalyzer:
    """Create AI complexity analyzer instance."""
    return AIComplexityAnalyzer()

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/processing_orchestrator.py:
--------------------------------------------------------------------------------

```python
"""Processing orchestration service for thought coordination.

This service handles the orchestration of different processing strategies,
coordinating between single-agent and multi-agent approaches, and managing
execution flows based on complexity analysis.
"""

import time
from typing import TYPE_CHECKING

from agno.agent import Agent

from mcp_server_mas_sequential_thinking.config import get_model_config
from mcp_server_mas_sequential_thinking.core import (
    SessionMemory,
    ThoughtProcessingError,
)
from mcp_server_mas_sequential_thinking.utils import setup_logging

if TYPE_CHECKING:
    from mcp_server_mas_sequential_thinking.routing import ComplexityLevel
    from mcp_server_mas_sequential_thinking.services.response_processor import (
        ResponseProcessor,
    )
    from mcp_server_mas_sequential_thinking.services.retry_handler import (
        TeamProcessingRetryHandler,
    )

logger = setup_logging()


class ProcessingOrchestrator:
    """Service responsible for orchestrating different processing strategies."""

    def __init__(
        self,
        session: SessionMemory,
        response_processor: "ResponseProcessor",
        retry_handler: "TeamProcessingRetryHandler",
    ) -> None:
        """Initialize the processing orchestrator.

        Args:
            session: The session memory instance
            response_processor: Response processing service
            retry_handler: Retry handling service
        """
        self._session = session
        self._response_processor = response_processor
        self._retry_handler = retry_handler

        # Initialize performance tracking
        from mcp_server_mas_sequential_thinking.infrastructure import (
            MetricsLogger,
        )

        self._metrics_logger = MetricsLogger()

    async def execute_single_agent_processing(
        self, input_prompt: str, simplified: bool = False
    ) -> str:
        """Execute single-agent processing for simple thoughts.

        Args:
            input_prompt: The input prompt to process
            simplified: Whether to use simplified prompt format

        Returns:
            Processed response content
        """
        try:
            # Create a lightweight agent for single processing
            simple_agent = self._create_simple_agent(
                processing_type="simple thought" if simplified else "thought",
                use_markdown=not simplified,
            )

            # Optionally simplify the prompt
            prompt_to_use = (
                self._create_simplified_prompt(input_prompt)
                if simplified
                else input_prompt
            )

            # Log single agent call details
            self._log_single_agent_call(simple_agent, prompt_to_use)

            start_time = time.time()
            response = await simple_agent.arun(prompt_to_use)
            processing_time = time.time() - start_time

            # Extract content from Agno RunOutput
            response_content = self._extract_response_content(response)

            # Log single agent response details
            self._log_single_agent_response(response_content, processing_time)

            logger.info("Single-agent processing completed successfully")
            return response_content

        except Exception as e:
            logger.warning(f"Single-agent processing failed, falling back to team: {e}")
            # Fallback to team processing
            return await self.execute_team_processing(input_prompt)

    async def execute_team_processing(self, input_prompt: str) -> str:
        """Execute team processing without timeout restrictions.

        Args:
            input_prompt: The input prompt to process

        Returns:
            Processed response content
        """
        try:
            response = await self._session.team.arun(input_prompt)
            return self._extract_response_content(response)
        except Exception as e:
            raise ThoughtProcessingError(f"Team coordination failed: {e}") from e

    async def execute_team_processing_with_retries(
        self, input_prompt: str, complexity_level: "ComplexityLevel"
    ) -> str:
        """Execute team processing using centralized retry handler.

        Args:
            input_prompt: The input prompt to process
            complexity_level: Complexity level for retry strategy

        Returns:
            Processed response content
        """
        team_info = self._get_team_info()
        self._metrics_logger.log_team_details(team_info)
        self._metrics_logger.log_input_details(input_prompt)

        async def team_operation():
            start_time = time.time()
            response = await self._session.team.arun(input_prompt)
            processing_time = time.time() - start_time

            processed_response = self._response_processor.process_response(
                response, processing_time, "MULTI-AGENT TEAM"
            )

            self._performance_tracker.record_processing(processing_time, True)
            return processed_response.content

        return await self._retry_handler.execute_team_processing(
            team_operation, team_info, complexity_level.value
        )

    def _create_simple_agent(
        self, processing_type: str = "thought", use_markdown: bool = False
    ) -> Agent:
        """Create a simple agent for single-thought processing.

        Args:
            processing_type: Type of processing for instructions
            use_markdown: Whether to enable markdown formatting

        Returns:
            Configured Agent instance
        """
        model_config = get_model_config()
        single_model = model_config.create_team_model()

        return Agent(
            name="SimpleProcessor",
            role="Simple Thought Processor",
            description=f"Processes {processing_type}s efficiently without multi-agent overhead",
            model=single_model,
            instructions=[
                f"You are processing a {processing_type} efficiently.",
                "Provide a focused, clear response.",
                "Include guidance for the next step.",
                "Be concise but helpful.",
            ],
            markdown=use_markdown,
        )

    def _create_simplified_prompt(self, input_prompt: str) -> str:
        """Create a simplified prompt for single-agent processing.

        Args:
            input_prompt: The original input prompt

        Returns:
            Simplified prompt
        """
        return f"""Process this thought efficiently:

{input_prompt}

Provide a focused response with clear guidance for the next step."""

    def _extract_response_content(self, response) -> str:
        """Extract clean content from Agno RunOutput objects.

        Args:
            response: The response object from agent processing

        Returns:
            Extracted text content
        """
        from mcp_server_mas_sequential_thinking.services.response_formatter import (
            ResponseExtractor,
        )

        return ResponseExtractor.extract_content(response)

    def _get_team_info(self) -> dict:
        """Extract team information for logging and retry handling.

        Returns:
            Dictionary containing team information
        """
        team = self._session.team
        return {
            "name": team.name,
            "member_count": len(team.members),
            "leader_class": team.model.__class__.__name__,
            "leader_model": getattr(team.model, "id", "unknown"),
            "member_names": ", ".join([m.name for m in team.members]),
        }

    def _log_single_agent_call(self, agent: Agent, prompt: str) -> None:
        """Log single agent call details.

        Args:
            agent: The agent being used
            prompt: The prompt being processed
        """
        logger.info("🤖 SINGLE-AGENT CALL:")
        logger.info(f"  Agent: {agent.name} ({agent.role})")
        logger.info(
            f"  Model: {getattr(agent.model, 'id', 'unknown')} ({agent.model.__class__.__name__})"
        )
        self._log_input_details(prompt)

    def _log_single_agent_response(self, content: str, processing_time: float) -> None:
        """Log single agent response details.

        Args:
            content: The response content
            processing_time: Processing time in seconds
        """
        logger.info("✅ SINGLE-AGENT RESPONSE:")
        self._log_output_details(content, processing_time)

    def _log_input_details(
        self, input_prompt: str, context_description: str = "input"
    ) -> None:
        """Log input details with consistent formatting.

        Args:
            input_prompt: The input prompt to log
            context_description: Description of the context
        """
        logger.info(f"  Input length: {len(input_prompt)} chars")
        logger.info(f"  Full {context_description}:\\n{input_prompt}")

        # Use performance metrics constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import PerformanceMetrics

            separator_length = PerformanceMetrics.SEPARATOR_LENGTH
        except ImportError:
            pass

        logger.info(f"  {'=' * separator_length}")

    def _log_output_details(
        self,
        response_content: str,
        processing_time: float,
        context_description: str = "response",
    ) -> None:
        """Log output details with consistent formatting.

        Args:
            response_content: The response content to log
            processing_time: Processing time in seconds
            context_description: Description of the context
        """
        logger.info(f"  Processing time: {processing_time:.3f}s")
        logger.info(f"  Output length: {len(response_content)} chars")
        logger.info(f"  Full {context_description}:\\n{response_content}")

        # Use performance metrics constant if available
        separator_length = 60  # Default fallback
        try:
            from mcp_server_mas_sequential_thinking.config import PerformanceMetrics

            separator_length = PerformanceMetrics.SEPARATOR_LENGTH
        except ImportError:
            pass

        logger.info(f"  {'=' * separator_length}")

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/server_core.py:
--------------------------------------------------------------------------------

```python
"""Refactored server core with separated concerns and reduced complexity."""

import os
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any

# Lazy import to break circular dependency
from pydantic import ValidationError

from mcp_server_mas_sequential_thinking.config import (
    DefaultTimeouts,
    DefaultValues,
    PerformanceMetrics,
    check_required_api_keys,
)
from mcp_server_mas_sequential_thinking.core import (
    ConfigurationError,
    SessionMemory,
    ThoughtData,
)
from mcp_server_mas_sequential_thinking.utils import setup_logging

logger = setup_logging()


class LoggingMixin:
    """Mixin class providing common logging utilities with reduced duplication."""

    @staticmethod
    def _log_section_header(
        title: str, separator_length: int = PerformanceMetrics.SEPARATOR_LENGTH
    ) -> None:
        """Log a formatted section header."""
        logger.info(f"{title}")

    @staticmethod
    def _log_metrics_block(title: str, metrics: dict[str, Any]) -> None:
        """Log a formatted metrics block."""
        logger.info(f"{title}")
        for key, value in metrics.items():
            if isinstance(value, float):
                logger.info(f"  {key}: {value:.2f}")
            elif isinstance(value, (int, str)):
                logger.info(f"  {key}: {value}")
            else:
                logger.info(f"  {key}: {value}")

    @staticmethod
    def _log_separator(length: int = PerformanceMetrics.SEPARATOR_LENGTH) -> None:
        """Log a separator line."""
        logger.info(f"  {'=' * length}")

    @staticmethod
    def _calculate_efficiency_score(processing_time: float) -> float:
        """Calculate efficiency score using standard metrics."""
        return (
            PerformanceMetrics.PERFECT_EFFICIENCY_SCORE
            if processing_time < PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD
            else max(
                PerformanceMetrics.MINIMUM_EFFICIENCY_SCORE,
                PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD / processing_time,
            )
        )

    @staticmethod
    def _calculate_execution_consistency(success_indicator: bool) -> float:
        """Calculate execution consistency using standard metrics."""
        return (
            PerformanceMetrics.PERFECT_EXECUTION_CONSISTENCY
            if success_indicator
            else PerformanceMetrics.DEFAULT_EXECUTION_CONSISTENCY
        )


@dataclass(frozen=True, slots=True)
class ServerConfig:
    """Immutable server configuration with clear defaults."""

    provider: str
    team_mode: str = DefaultValues.DEFAULT_TEAM_MODE
    log_level: str = DefaultValues.DEFAULT_LOG_LEVEL
    max_retries: int = DefaultValues.DEFAULT_MAX_RETRIES
    timeout: float = DefaultTimeouts.PROCESSING_TIMEOUT

    @classmethod
    def from_environment(cls) -> "ServerConfig":
        """Create config from environment with sensible defaults."""
        return cls(
            provider=os.environ.get("LLM_PROVIDER", DefaultValues.DEFAULT_LLM_PROVIDER),
            team_mode=os.environ.get(
                "TEAM_MODE", DefaultValues.DEFAULT_TEAM_MODE
            ).lower(),
            log_level=os.environ.get("LOG_LEVEL", DefaultValues.DEFAULT_LOG_LEVEL),
            max_retries=int(
                os.environ.get("MAX_RETRIES", str(DefaultValues.DEFAULT_MAX_RETRIES))
            ),
            timeout=float(
                os.environ.get("TIMEOUT", str(DefaultValues.DEFAULT_TIMEOUT))
            ),
        )


class ServerInitializer(ABC):
    """Abstract initializer for server components."""

    @abstractmethod
    async def initialize(self, config: ServerConfig) -> None:
        """Initialize server component."""

    @abstractmethod
    async def cleanup(self) -> None:
        """Clean up server component."""


class EnvironmentInitializer(ServerInitializer):
    """Handles environment validation and setup."""

    async def initialize(self, config: ServerConfig) -> None:
        """Validate environment requirements with enhanced error handling."""
        logger.info(f"Initializing environment with {config.provider} provider")

        try:
            # Graceful degradation prevents startup failures from missing optional keys
            missing_keys = check_required_api_keys()
            if missing_keys:
                logger.warning(f"Missing API keys: {', '.join(missing_keys)}")
                # Note: Don't fail here as some providers might not require keys

            # Ensure log directory exists
            log_dir = Path.home() / ".sequential_thinking" / "logs"
            if not log_dir.exists():
                logger.info(f"Creating log directory: {log_dir}")
                try:
                    log_dir.mkdir(parents=True, exist_ok=True)
                except OSError as e:
                    raise ConfigurationError(
                        f"Failed to create log directory {log_dir}: {e}"
                    ) from e

        except Exception as e:
            if not isinstance(e, ConfigurationError):
                raise ConfigurationError(
                    f"Environment initialization failed: {e}"
                ) from e
            raise

    async def cleanup(self) -> None:
        """No cleanup needed for environment."""


class ServerState:
    """Manages server state with proper lifecycle and separation of concerns."""

    def __init__(self) -> None:
        self._config: ServerConfig | None = None
        self._session: SessionMemory | None = None
        self._initializers = [
            EnvironmentInitializer(),
        ]

    async def initialize(self, config: ServerConfig) -> None:
        """Initialize all server components."""
        self._config = config

        # Ordered initialization prevents dependency conflicts
        for initializer in self._initializers:
            await initializer.initialize(config)

        # Session-based architecture simplifies state management
        self._session = SessionMemory()

        logger.info(
            "Server state initialized successfully with multi-thinking workflow"
        )

    async def cleanup(self) -> None:
        """Clean up all server components."""
        # Clean up in reverse order
        for initializer in reversed(self._initializers):
            await initializer.cleanup()

        self._config = None
        self._session = None

        logger.info("Server state cleaned up")

    @property
    def config(self) -> ServerConfig:
        """Get current configuration."""
        if self._config is None:
            raise RuntimeError("Server not initialized - config unavailable")
        return self._config

    @property
    def session(self) -> SessionMemory:
        """Get current session."""
        if self._session is None:
            raise RuntimeError("Server not initialized - session unavailable")
        return self._session


# Remove redundant exception definition as it's now in types.py


class ThoughtProcessor:
    """Simplified thought processor that delegates to the refactored implementation.

    This maintains backward compatibility while using the new service-based architecture.
    """

    def __init__(self, session: SessionMemory) -> None:
        # Import and delegate to the refactored implementation
        from .thought_processor_refactored import (
            ThoughtProcessor as RefactoredProcessor,
        )

        self._processor = RefactoredProcessor(session)

    async def process_thought(self, thought_data: ThoughtData) -> str:
        """Process a thought through the team with comprehensive error handling."""
        return await self._processor.process_thought(thought_data)

    # Legacy methods for backward compatibility - delegate to refactored processor
    def _extract_response_content(self, response) -> str:
        """Legacy method - delegates to refactored processor."""
        return self._processor._extract_response_content(response)

    def _build_context_prompt(self, thought_data: ThoughtData) -> str:
        """Legacy method - delegates to refactored processor."""
        return self._processor._build_context_prompt(thought_data)

    def _format_response(self, content: str, thought_data: ThoughtData) -> str:
        """Legacy method - delegates to refactored processor."""
        return self._processor._format_response(content, thought_data)

    async def _execute_single_agent_processing(
        self, input_prompt: str, routing_decision=None
    ) -> str:
        """Legacy method - delegates to refactored processor."""
        return await self._processor._execute_single_agent_processing(
            input_prompt, routing_decision
        )

    async def _execute_team_processing(self, input_prompt: str) -> str:
        """Legacy method - delegates to refactored processor."""
        return await self._processor._execute_team_processing(input_prompt)


@asynccontextmanager
async def create_server_lifespan() -> AsyncIterator[ServerState]:
    """Create server lifespan context manager with proper resource management."""
    config = ServerConfig.from_environment()
    server_state = ServerState()

    try:
        await server_state.initialize(config)
        logger.info("Server started successfully")
        yield server_state

    except Exception as e:
        logger.error(f"Server initialization failed: {e}", exc_info=True)
        raise ServerInitializationError(f"Failed to initialize server: {e}") from e

    finally:
        await server_state.cleanup()
        logger.info("Server shutdown complete")


class ServerInitializationError(Exception):
    """Custom exception for server initialization failures."""


def create_validated_thought_data(
    thought: str,
    thoughtNumber: int,
    totalThoughts: int,
    nextThoughtNeeded: bool,
    isRevision: bool,
    branchFromThought: int | None,
    branchId: str | None,
    needsMoreThoughts: bool,
) -> ThoughtData:
    """Create and validate thought data with enhanced error reporting."""
    try:
        return ThoughtData(
            thought=thought.strip(),
            thoughtNumber=thoughtNumber,
            totalThoughts=totalThoughts,
            nextThoughtNeeded=nextThoughtNeeded,
            isRevision=isRevision,
            branchFromThought=branchFromThought,
            branchId=branchId.strip() if branchId else None,
            needsMoreThoughts=needsMoreThoughts,
        )
    except ValidationError as e:
        raise ValueError(f"Invalid thought data: {e}") from e


# Global server state with workflow support
_server_state: ServerState | None = None
_thought_processor: ThoughtProcessor | None = None


async def get_thought_processor() -> ThoughtProcessor:
    """Get or create the global thought processor with workflow support."""
    global _thought_processor, _server_state

    if _thought_processor is None:
        if _server_state is None:
            raise RuntimeError(
                "Server not properly initialized - _server_state is None. Ensure app_lifespan is running."
            )

        logger.info("Initializing ThoughtProcessor with multi-thinking workflow")
        _thought_processor = ThoughtProcessor(_server_state.session)

    return _thought_processor

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/config/modernized_config.py:
--------------------------------------------------------------------------------

```python
"""Modernized configuration management with dependency injection and clean abstractions.

Clean configuration management with modern Python patterns.
"""

import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Protocol, runtime_checkable

from agno.models.anthropic import Claude
from agno.models.base import Model
from agno.models.deepseek import DeepSeek
from agno.models.groq import Groq
from agno.models.ollama import Ollama
from agno.models.openai import OpenAIChat
from agno.models.openrouter import OpenRouter


class GitHubOpenAI(OpenAIChat):
    """OpenAI provider configured for GitHub Models API with enhanced validation."""

    @staticmethod
    def _validate_github_token(token: str) -> None:
        """Validate GitHub token format with comprehensive security checks."""
        if not token:
            raise ValueError("GitHub token is required but not provided")

        # Strip whitespace and validate basic format
        token = token.strip()

        # Valid GitHub token prefixes with their expected lengths
        token_specs = {
            "ghp_": 40,  # Classic personal access token
            "github_pat_": lambda t: len(t) >= 93,  # Fine-grained PAT (variable length)
            "gho_": 40,  # OAuth app token
            "ghu_": 40,  # User-to-server token
            "ghs_": 40,  # Server-to-server token
        }

        # Check token prefix and length
        valid_token = False
        for prefix, expected_length in token_specs.items():
            if token.startswith(prefix):
                if callable(expected_length):
                    if expected_length(token):
                        valid_token = True
                        break
                elif len(token) == expected_length:
                    valid_token = True
                    break

        if not valid_token:
            raise ValueError(
                "Invalid GitHub token format. Token must be a valid GitHub personal "
                "access token, "
                "OAuth token, or fine-grained personal access token with correct "
                "prefix and length."
            )

        # Enhanced entropy validation to prevent fake tokens
        token_body = (
            token[4:]
            if token.startswith("ghp_")
            else token.split("_", 1)[1]
            if "_" in token
            else token
        )

        # Check for minimum entropy (character diversity)
        unique_chars = len(set(token_body.lower()))
        if unique_chars < 15:  # GitHub tokens should have high entropy
            raise ValueError(
                "GitHub token appears to have insufficient entropy. Please ensure "
                "you're using a real GitHub token."
            )

        # Check for obvious fake patterns
        fake_patterns = ["test", "fake", "demo", "example", "placeholder", "your_token"]
        if any(pattern in token.lower() for pattern in fake_patterns):
            raise ValueError(
                "GitHub token appears to be a placeholder or test value. Please "
                "use a real GitHub token."
            )

    def __init__(self, **kwargs: Any) -> None:
        # Set GitHub Models configuration
        kwargs.setdefault("base_url", "https://models.github.ai/inference")

        if "api_key" not in kwargs:
            kwargs["api_key"] = os.environ.get("GITHUB_TOKEN")

        api_key = kwargs.get("api_key")
        if not api_key:
            raise ValueError(
                "GitHub token is required but not found in GITHUB_TOKEN "
                "environment variable"
            )

        if isinstance(api_key, str):
            self._validate_github_token(api_key)
        super().__init__(**kwargs)


@dataclass(frozen=True)
class ModelConfig:
    """Immutable configuration for model provider and settings."""

    provider_class: type[Model]
    enhanced_model_id: str
    standard_model_id: str
    api_key: str | None = None

    def create_enhanced_model(self) -> Model:
        """Create enhanced model instance (used for complex synthesis like Blue Hat)."""
        # Enable prompt caching for Anthropic models
        if self.provider_class == Claude:
            return self.provider_class(
                id=self.enhanced_model_id,
                # Note: cache_system_prompt removed - not available in current Agno version
            )
        return self.provider_class(id=self.enhanced_model_id)

    def create_standard_model(self) -> Model:
        """Create standard model instance (used for individual hat processing)."""
        # Enable prompt caching for Anthropic models
        if self.provider_class == Claude:
            return self.provider_class(
                id=self.standard_model_id,
                # Note: cache_system_prompt removed - not available in current Agno version
            )
        return self.provider_class(id=self.standard_model_id)


@runtime_checkable
class ConfigurationStrategy(Protocol):
    """Protocol defining configuration strategy interface."""

    def get_config(self) -> ModelConfig:
        """Return model configuration for this strategy."""
        ...

    def get_required_environment_variables(self) -> dict[str, bool]:
        """Return required environment variables and whether they're optional."""
        ...


class BaseProviderStrategy(ABC):
    """Abstract base strategy with common functionality."""

    @property
    @abstractmethod
    def provider_class(self) -> type[Model]:
        """Return the provider model class."""

    @property
    @abstractmethod
    def default_enhanced_model(self) -> str:
        """Return default enhanced model ID (for complex synthesis)."""

    @property
    @abstractmethod
    def default_standard_model(self) -> str:
        """Return default standard model ID (for individual processing)."""

    @property
    @abstractmethod
    def api_key_name(self) -> str | None:
        """Return API key environment variable name."""

    def _get_env_with_fallback(self, env_var: str, fallback: str) -> str:
        """Get environment variable with fallback to default."""
        value = os.environ.get(env_var, "").strip()
        return value if value else fallback

    def get_config(self) -> ModelConfig:
        """Get complete configuration with environment overrides."""
        prefix = self.__class__.__name__.replace("Strategy", "").upper()

        enhanced_model = self._get_env_with_fallback(
            f"{prefix}_ENHANCED_MODEL_ID", self.default_enhanced_model
        )
        standard_model = self._get_env_with_fallback(
            f"{prefix}_STANDARD_MODEL_ID", self.default_standard_model
        )

        # Get API key with enhanced validation and None handling
        api_key: str | None = None
        if self.api_key_name:
            api_key = os.environ.get(self.api_key_name, "").strip()
            api_key = api_key if api_key else None

        return ModelConfig(
            provider_class=self.provider_class,
            enhanced_model_id=enhanced_model,
            standard_model_id=standard_model,
            api_key=api_key,
        )

    def get_required_environment_variables(self) -> dict[str, bool]:
        """Return required environment variables."""
        env_vars: dict[str, bool] = {}

        if self.api_key_name:
            env_vars[self.api_key_name] = False  # Required

        # Enhanced/standard environment variables (optional)
        prefix = self.__class__.__name__.replace("Strategy", "").upper()
        env_vars[f"{prefix}_ENHANCED_MODEL_ID"] = True  # Optional
        env_vars[f"{prefix}_STANDARD_MODEL_ID"] = True  # Optional

        return env_vars


# Concrete strategy implementations
class DeepSeekStrategy(BaseProviderStrategy):
    """DeepSeek provider strategy."""

    provider_class = DeepSeek
    default_enhanced_model = "deepseek-chat"
    default_standard_model = "deepseek-chat"
    api_key_name = "DEEPSEEK_API_KEY"


class GroqStrategy(BaseProviderStrategy):
    """Groq provider strategy."""

    provider_class = Groq
    default_enhanced_model = "openai/gpt-oss-120b"
    default_standard_model = "openai/gpt-oss-20b"
    api_key_name = "GROQ_API_KEY"


class OpenRouterStrategy(BaseProviderStrategy):
    """OpenRouter provider strategy."""

    provider_class = OpenRouter
    default_enhanced_model = "deepseek/deepseek-chat-v3-0324"
    default_standard_model = "deepseek/deepseek-r1"
    api_key_name = "OPENROUTER_API_KEY"


class OllamaStrategy(BaseProviderStrategy):
    """Ollama provider strategy (no API key required)."""

    provider_class = Ollama
    default_enhanced_model = "devstral:24b"
    default_standard_model = "devstral:24b"
    api_key_name = None


class GitHubStrategy(BaseProviderStrategy):
    """GitHub Models provider strategy."""

    @property
    def provider_class(self) -> type[Model]:
        return GitHubOpenAI

    @property
    def default_enhanced_model(self) -> str:
        return "openai/gpt-5"

    @property
    def default_standard_model(self) -> str:
        return "openai/gpt-5-min"

    @property
    def api_key_name(self) -> str:
        return "GITHUB_TOKEN"


class AnthropicStrategy(BaseProviderStrategy):
    """Anthropic Claude provider strategy with prompt caching enabled."""

    @property
    def provider_class(self) -> type[Model]:
        return Claude

    @property
    def default_enhanced_model(self) -> str:
        return "claude-3-5-sonnet-20241022"

    @property
    def default_standard_model(self) -> str:
        return "claude-3-5-haiku-20241022"

    @property
    def api_key_name(self) -> str:
        return "ANTHROPIC_API_KEY"


class ConfigurationManager:
    """Manages configuration strategies with dependency injection."""

    def __init__(self) -> None:
        self._strategies = {
            "deepseek": DeepSeekStrategy(),
            "groq": GroqStrategy(),
            "openrouter": OpenRouterStrategy(),
            "ollama": OllamaStrategy(),
            "github": GitHubStrategy(),
            "anthropic": AnthropicStrategy(),
        }
        self._default_strategy = "deepseek"

    def register_strategy(self, name: str, strategy: ConfigurationStrategy) -> None:
        """Register a new configuration strategy."""
        self._strategies[name] = strategy

    def get_strategy(self, provider_name: str | None = None) -> ConfigurationStrategy:
        """Get strategy for specified provider."""
        if provider_name is None:
            provider_name = os.environ.get("LLM_PROVIDER", self._default_strategy)

        provider_name = provider_name.lower()

        if provider_name not in self._strategies:
            available = list(self._strategies.keys())
            raise ValueError(
                f"Unknown provider: {provider_name}. Available: {available}"
            )

        return self._strategies[provider_name]

    def get_model_config(self, provider_name: str | None = None) -> ModelConfig:
        """Get model configuration using dependency injection."""
        strategy = self.get_strategy(provider_name)
        return strategy.get_config()

    def validate_environment(self, provider_name: str | None = None) -> dict[str, str]:
        """Validate environment variables and return missing required ones."""
        strategy = self.get_strategy(provider_name)
        required_vars = strategy.get_required_environment_variables()

        missing: dict[str, str] = {}
        for var_name, is_optional in required_vars.items():
            if not is_optional and not os.environ.get(var_name):
                missing[var_name] = "Required but not set"

        # Check EXA API key for research functionality (optional)
        # Note: EXA tools will be disabled if key is not provided
        exa_key = os.environ.get("EXA_API_KEY")
        if not exa_key:
            # Don't fail startup - just log warning that research will be disabled
            import logging

            logging.getLogger(__name__).warning(
                "EXA_API_KEY not found. Research tools will be disabled."
            )

        return missing

    def get_available_providers(self) -> list[str]:
        """Get list of available provider names."""
        return list(self._strategies.keys())


# Singleton instance for dependency injection
_config_manager = ConfigurationManager()


# Public API functions
def get_model_config(provider_name: str | None = None) -> ModelConfig:
    """Get model configuration using modernized configuration management."""
    return _config_manager.get_model_config(provider_name)


def check_required_api_keys(provider_name: str | None = None) -> list[str]:
    """Check for missing required API keys."""
    missing_vars = _config_manager.validate_environment(provider_name)
    return list(missing_vars.keys())


def register_provider_strategy(name: str, strategy: ConfigurationStrategy) -> None:
    """Register a custom provider strategy."""
    _config_manager.register_strategy(name, strategy)


def get_available_providers() -> list[str]:
    """Get list of available providers."""
    return _config_manager.get_available_providers()

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/routing/agno_workflow_router.py:
--------------------------------------------------------------------------------

```python
"""Multi-Thinking Workflow Router - Complete Rewrite.

纯净的多向思维工作流实现, 基于Agno v2.0框架。
完全移除旧的复杂度路由, 专注于多向思维方法论。
"""

import logging
import re
import time
from dataclasses import dataclass
from typing import Any

from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow

from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config

# Import at module level - moved from function to avoid PLC0415
from mcp_server_mas_sequential_thinking.core.models import ThoughtData
from mcp_server_mas_sequential_thinking.processors.multi_thinking_processor import (
    MultiThinkingSequentialProcessor,
    create_multi_thinking_step_output,
)

logger = logging.getLogger(__name__)


@dataclass
class MultiThinkingWorkflowResult:
    """Result from Multi-Thinking workflow execution."""

    content: str
    strategy_used: str
    processing_time: float
    complexity_score: float
    step_name: str
    thinking_sequence: list[str]
    cost_reduction: float


class MultiThinkingWorkflowRouter:
    """Pure Multi-Thinking workflow router using Agno v2.0."""

    def __init__(self) -> None:
        """Initialize Multi-Thinking workflow router."""
        self.model_config = get_model_config()

        # Initialize Multi-Thinking processor
        self.multi_thinking_processor = MultiThinkingSequentialProcessor()

        # Create Multi-Thinking processing step
        self.multi_thinking_step = self._create_multi_thinking_step()

        # Create router that always selects Multi-Thinking
        self.router = Router(
            name="multi_thinking_router",
            selector=self._multi_thinking_selector,
            choices=[self.multi_thinking_step],
        )

        # Create main workflow
        self.workflow = Workflow(
            name="multi_thinking_workflow",
            steps=[self.router],
        )

        logger.info("Multi-Thinking Workflow Router initialized")

    def _multi_thinking_selector(self, step_input: StepInput) -> list[Step]:
        """Selector that always returns Multi-Thinking processing."""
        try:
            logger.info("🎩 MULTI-THINKING WORKFLOW ROUTING:")

            # Extract thought content for logging
            if isinstance(step_input.input, dict):
                thought_content = step_input.input.get("thought", "")
                thought_number = step_input.input.get("thought_number", 1)
                total_thoughts = step_input.input.get("total_thoughts", 1)
            else:
                thought_content = str(step_input.input)
                thought_number = 1
                total_thoughts = 1

            logger.info(
                "  📝 Input: %s%s",
                thought_content[:100],
                "..." if len(thought_content) > 100 else "",
            )
            logger.info("  🔢 Progress: %s/%s", thought_number, total_thoughts)
            logger.info("  ✅ Multi-Thinking selected - exclusive thinking methodology")

            return [self.multi_thinking_step]

        except Exception as e:
            logger.exception("Error in Multi-Thinking selector: %s", e)
            logger.warning("Continuing with Multi-Thinking processing despite error")
            return [self.multi_thinking_step]

    def _create_multi_thinking_step(self) -> Step:
        """Create Six Thinking Hats processing step."""

        async def multi_thinking_executor(
            step_input: StepInput, session_state: dict[str, Any]
        ) -> StepOutput:
            """Execute Multi-Thinking thinking process."""
            try:
                logger.info("🎩 MULTI-THINKING STEP EXECUTION:")

                # Extract thought content and metadata
                if isinstance(step_input.input, dict):
                    thought_content = step_input.input.get(
                        "thought", str(step_input.input)
                    )
                    thought_number = step_input.input.get("thought_number", 1)
                    total_thoughts = step_input.input.get("total_thoughts", 1)
                    context = step_input.input.get("context", "")
                else:
                    thought_content = str(step_input.input)
                    thought_number = 1
                    total_thoughts = 1
                    context = ""

                # ThoughtData is now imported at module level

                # Extract context preservation fields from input if available
                if isinstance(step_input.input, dict):
                    is_revision = step_input.input.get("isRevision", False)
                    branch_from_thought = step_input.input.get("branchFromThought")
                    branch_id = step_input.input.get("branchId")
                    needs_more_thoughts = step_input.input.get(
                        "needsMoreThoughts", False
                    )
                    next_thought_needed = step_input.input.get(
                        "nextThoughtNeeded", True
                    )
                else:
                    is_revision = False
                    branch_from_thought = None
                    branch_id = None
                    needs_more_thoughts = False
                    next_thought_needed = True

                thought_data = ThoughtData(
                    thought=thought_content,
                    thoughtNumber=thought_number,
                    totalThoughts=total_thoughts,
                    nextThoughtNeeded=next_thought_needed,
                    isRevision=is_revision,
                    branchFromThought=branch_from_thought,
                    branchId=branch_id,
                    needsMoreThoughts=needs_more_thoughts,
                )

                logger.info("  📝 Input: %s...", thought_content[:100])
                logger.info("  🔢 Thought: %s/%s", thought_number, total_thoughts)

                # Process with Multi-Thinking
                result = (
                    await self.multi_thinking_processor.process_with_multi_thinking(
                        thought_data, context
                    )
                )

                # Store metadata in session_state
                session_state["current_strategy"] = result.strategy_used
                session_state["current_complexity_score"] = result.complexity_score
                session_state["thinking_sequence"] = result.thinking_sequence
                session_state["cost_reduction"] = result.cost_reduction

                logger.info("  ✅ Multi-Thinking completed: %s", result.strategy_used)
                logger.info("  📊 Complexity: %.1f", result.complexity_score)
                logger.info("  💰 Cost Reduction: %.1f%%", result.cost_reduction)

                return create_multi_thinking_step_output(result)

            except Exception as e:
                logger.exception("  ❌ Multi-Thinking execution failed")
                return StepOutput(
                    content=f"Multi-Thinking processing failed: {e!s}",
                    success=False,
                    error=str(e),
                    step_name="multi_thinking_error",
                )

        return Step(
            name="multi_thinking_processing",
            executor=multi_thinking_executor,
            description="Six Thinking Hats sequential processing",
        )

    async def process_thought_workflow(
        self, thought_data: "ThoughtData", context_prompt: str
    ) -> MultiThinkingWorkflowResult:
        """Process thought using Multi-Thinking workflow."""
        start_time = time.time()

        try:
            logger.info("🚀 MULTI-THINKING WORKFLOW INITIALIZATION:")
            logger.info(
                "  📝 Thought: %s%s",
                thought_data.thought[:100],
                "..." if len(thought_data.thought) > 100 else "",
            )
            logger.info(
                "  🔢 Thought Number: %s/%s",
                thought_data.thoughtNumber,
                thought_data.totalThoughts,
            )
            logger.info("  📋 Context Length: %d chars", len(context_prompt))
            logger.info(
                "  ⏰ Start Time: %s",
                time.strftime("%H:%M:%S", time.localtime(start_time)),
            )

            # Prepare workflow input for Multi-Thinking
            workflow_input = {
                "thought": thought_data.thought,
                "thought_number": thought_data.thoughtNumber,
                "total_thoughts": thought_data.totalThoughts,
                "context": context_prompt,
            }

            logger.info("📦 WORKFLOW INPUT PREPARATION:")
            logger.info("  📊 Input Keys: %s", list(workflow_input.keys()))
            logger.info("  📏 Input Size: %d chars", len(str(workflow_input)))

            # Initialize session_state for metadata tracking
            session_state: dict[str, Any] = {
                "start_time": start_time,
                "thought_number": thought_data.thoughtNumber,
                "total_thoughts": thought_data.totalThoughts,
            }

            logger.info("🎯 SESSION STATE SETUP:")
            logger.info("  🔑 State Keys: %s", list(session_state.keys()))
            logger.info("  📈 Metadata: %s", session_state)

            logger.info(
                "▶️  EXECUTING Multi-Thinking workflow for thought #%s",
                thought_data.thoughtNumber,
            )

            # Execute Multi-Thinking workflow
            logger.info("🔄 WORKFLOW EXECUTION START...")
            result = await self.workflow.arun(
                input=workflow_input, session_state=session_state
            )
            logger.info("✅ WORKFLOW EXECUTION COMPLETED")

            processing_time = time.time() - start_time

            # Extract clean content from result
            content = self._extract_clean_content(result)

            logger.info("📋 CONTENT VALIDATION:")
            logger.info(
                "  ✅ Content extracted successfully: %d characters", len(content)
            )
            logger.info(
                "  📝 Content preview: %s%s",
                content[:150],
                "..." if len(content) > 150 else "",
            )

            # Get metadata from session_state
            complexity_score = session_state.get("current_complexity_score", 0.0)
            strategy_used = session_state.get("current_strategy", "multi_thinking")
            thinking_sequence = session_state.get("thinking_sequence", [])
            cost_reduction = session_state.get("cost_reduction", 0.0)

            logger.info("📊 WORKFLOW RESULT COMPILATION:")
            logger.info("  🎯 Strategy used: %s", strategy_used)
            logger.info("  🧠 Thinking sequence: %s", " → ".join(thinking_sequence))
            logger.info("  📈 Complexity score: %.1f", complexity_score)
            logger.info("  💰 Cost reduction: %.1f%%", cost_reduction)
            logger.info("  ⏱️  Processing time: %.3fs", processing_time)

            workflow_result = MultiThinkingWorkflowResult(
                content=content,
                strategy_used=strategy_used,
                processing_time=processing_time,
                complexity_score=complexity_score,
                step_name="multi_thinking_execution",
                thinking_sequence=thinking_sequence,
                cost_reduction=cost_reduction,
            )

            logger.info("🎉 MULTI-THINKING WORKFLOW COMPLETION:")
            logger.info(
                "  ✅ Completed: strategy=%s, time=%.3fs, score=%.1f, reduction=%.1f%%",
                strategy_used,
                processing_time,
                complexity_score,
                cost_reduction,
            )

            return workflow_result

        except Exception as e:
            processing_time = time.time() - start_time
            logger.exception(
                "Multi-Thinking workflow execution failed after %.3fs", processing_time
            )

            return MultiThinkingWorkflowResult(
                content=f"Error processing thought with Multi-Thinking: {e!s}",
                strategy_used="error_fallback",
                processing_time=processing_time,
                complexity_score=0.0,
                step_name="error_handling",
                thinking_sequence=[],
                cost_reduction=0.0,
            )

    def _extract_clean_content(self, result: Any) -> str:
        """Extract clean content from workflow result."""

        def extract_recursive(obj: Any, depth: int = 0) -> str:
            """Recursively extract clean content from nested objects."""
            if depth > 10:  # Prevent infinite recursion
                return str(obj)

            # Handle dictionary with common content keys
            if isinstance(obj, dict):
                for key in [
                    "result",
                    "content",
                    "message",
                    "text",
                    "response",
                    "output",
                    "answer",
                ]:
                    if key in obj:
                        return extract_recursive(obj[key], depth + 1)
                # Fallback to any string content
                for value in obj.values():
                    if isinstance(value, str) and len(value.strip()) > 10:
                        return value.strip()
                return str(obj)

            # Handle objects with content attributes
            if hasattr(obj, "content"):
                content = obj.content
                if isinstance(content, str):
                    return content.strip()
                return extract_recursive(content, depth + 1)

            # Handle other output objects
            if hasattr(obj, "output"):
                return extract_recursive(obj.output, depth + 1)

            # Handle list/tuple - extract first meaningful content
            if isinstance(obj, (list, tuple)) and obj:
                for item in obj:
                    result = extract_recursive(item, depth + 1)
                    if isinstance(result, str) and len(result.strip()) > 10:
                        return result.strip()

            # If it's already a string, clean it up
            if isinstance(obj, str):
                content = obj.strip()

                # Remove object representations
                if any(
                    content.startswith(pattern)
                    for pattern in [
                        "RunOutput(",
                        "TeamRunOutput(",
                        "StepOutput(",
                        "WorkflowResult(",
                        "{'result':",
                        '{"result":',
                        "{'content':",
                        '{"content":',
                    ]
                ):
                    # Try to extract content using regex (re imported at module level)

                    patterns = [
                        (r"content='([^']*)'", 1),
                        (r'content="([^"]*)"', 1),
                        (r"'result':\s*'([^']*)'", 1),
                        (r'"result":\s*"([^"]*)"', 1),
                        (r"'([^']{20,})'", 1),
                        (r'"([^"]{20,})"', 1),
                    ]

                    for pattern, group in patterns:
                        match = re.search(pattern, content)
                        if match:
                            extracted = match.group(group).strip()
                            if len(extracted) > 10:
                                return extracted

                    # Clean up object syntax
                    cleaned = re.sub(r'[{}()"\']', " ", content)
                    cleaned = re.sub(
                        r"\b(RunOutput|TeamRunOutput|StepOutput|content|result|success|error)\b",
                        " ",
                        cleaned,
                    )
                    cleaned = re.sub(r"\s+", " ", cleaned).strip()

                    if len(cleaned) > 20:
                        return cleaned

                return content

            # Fallback
            result = str(obj).strip()
            if len(result) > 20 and not any(
                result.startswith(pattern)
                for pattern in ["RunOutput(", "TeamRunOutput(", "StepOutput(", "<"]
            ):
                return result

            return "Multi-Thinking processing completed successfully"

        return extract_recursive(result)


# For backward compatibility with the old AgnoWorkflowRouter name
AgnoWorkflowRouter = MultiThinkingWorkflowRouter
WorkflowResult = MultiThinkingWorkflowResult

```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/infrastructure/persistent_memory.py:
--------------------------------------------------------------------------------

```python
"""Persistent memory management with SQLAlchemy and memory pruning."""

import os
from datetime import datetime, timedelta
from pathlib import Path

from sqlalchemy import (
    JSON,
    Boolean,
    Column,
    DateTime,
    Float,
    ForeignKey,
    Index,
    Integer,
    String,
    Text,
    create_engine,
    desc,
)
from sqlalchemy.orm import DeclarativeBase, Session, relationship, sessionmaker
from sqlalchemy.pool import StaticPool

from mcp_server_mas_sequential_thinking.config.constants import (
    DatabaseConstants,
    DefaultSettings,
)
from mcp_server_mas_sequential_thinking.core.models import ThoughtData

from .logging_config import get_logger

logger = get_logger(__name__)


class Base(DeclarativeBase):
    """Base class for SQLAlchemy models with modern typing support."""


class SessionRecord(Base):
    """Database model for session storage."""

    __tablename__ = "sessions"

    id = Column(String, primary_key=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    total_thoughts = Column(Integer, default=0)
    total_cost = Column(Float, default=0.0)
    provider = Column(String, default="deepseek")

    # Relationship to thoughts
    thoughts = relationship(
        "ThoughtRecord", back_populates="session", cascade="all, delete-orphan"
    )

    # Index for performance
    __table_args__ = (
        Index("ix_session_created", "created_at"),
        Index("ix_session_updated", "updated_at"),
    )


class ThoughtRecord(Base):
    """Database model for thought storage."""

    __tablename__ = "thoughts"

    id = Column(Integer, primary_key=True)
    session_id = Column(String, ForeignKey("sessions.id"), nullable=False)
    thought_number = Column(Integer, nullable=False)
    thought = Column(Text, nullable=False)
    total_thoughts = Column(Integer, nullable=False)
    next_needed = Column(Boolean, nullable=False)
    branch_from = Column(Integer, nullable=True)
    branch_id = Column(String, nullable=True)

    # Processing metadata
    processing_strategy = Column(String, nullable=True)
    complexity_score = Column(Float, nullable=True)
    estimated_cost = Column(Float, nullable=True)
    actual_cost = Column(Float, nullable=True)
    token_usage = Column(Integer, nullable=True)
    processing_time = Column(Float, nullable=True)

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow)
    processed_at = Column(DateTime, nullable=True)

    # Response data
    response = Column(Text, nullable=True)
    specialist_used = Column(JSON, nullable=True)  # List of specialists used

    # Relationship
    session = relationship("SessionRecord", back_populates="thoughts")

    # Indexes for performance
    __table_args__ = (
        Index("ix_thought_session", "session_id"),
        Index("ix_thought_number", "thought_number"),
        Index("ix_thought_created", "created_at"),
        Index("ix_thought_branch", "branch_from", "branch_id"),
    )

    def to_thought_data(self) -> ThoughtData:
        """Convert database record to ThoughtData model."""
        return ThoughtData(
            thought=str(self.thought),
            thoughtNumber=int(self.thought_number),
            totalThoughts=int(self.total_thoughts),
            nextThoughtNeeded=bool(self.next_needed),
            isRevision=False,  # Assuming default value is intentional
            branchFromThought=self.branch_from,
            branchId=self.branch_id,
            needsMoreThoughts=True,  # Assuming default value is intentional
        )


class BranchRecord(Base):
    """Database model for branch tracking."""

    __tablename__ = "branches"

    id = Column(Integer, primary_key=True)
    session_id = Column(String, ForeignKey("sessions.id"), nullable=False)
    branch_id = Column(String, nullable=False)
    parent_thought = Column(Integer, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    thought_count = Column(Integer, default=0)

    __table_args__ = (
        Index("ix_branch_session", "session_id"),
        Index("ix_branch_id", "branch_id"),
    )


class UsageMetrics(Base):
    """Database model for usage tracking and cost optimization."""

    __tablename__ = "usage_metrics"

    id = Column(Integer, primary_key=True)
    date = Column(DateTime, default=datetime.utcnow)
    provider = Column(String, nullable=False)
    processing_strategy = Column(String, nullable=False)
    complexity_level = Column(String, nullable=False)

    # Metrics
    thought_count = Column(Integer, default=0)
    total_tokens = Column(Integer, default=0)
    total_cost = Column(Float, default=0.0)
    avg_processing_time = Column(Float, default=0.0)
    success_rate = Column(Float, default=1.0)

    # Performance tracking
    token_efficiency = Column(Float, default=0.0)  # quality/token ratio
    cost_effectiveness = Column(Float, default=0.0)  # quality/cost ratio

    __table_args__ = (
        Index("ix_metrics_date", "date"),
        Index("ix_metrics_provider", "provider"),
        Index("ix_metrics_strategy", "processing_strategy"),
    )


class PersistentMemoryManager:
    """Manages persistent storage and memory pruning."""

    def __init__(self, database_url: str | None = None) -> None:
        """Initialize persistent memory manager."""
        # Default to local SQLite database
        if database_url is None:
            db_dir = Path.home() / ".sequential_thinking" / "data"
            db_dir.mkdir(parents=True, exist_ok=True)
            database_url = f"sqlite:///{db_dir}/memory.db"

        # Configure engine with connection pooling
        if database_url.startswith("sqlite"):
            # SQLite-specific configuration
            self.engine = create_engine(
                database_url,
                poolclass=StaticPool,
                connect_args={"check_same_thread": False},
                echo=False,
            )
        else:
            # PostgreSQL/other database configuration
            self.engine = create_engine(
                database_url,
                pool_pre_ping=True,
                pool_size=DatabaseConstants.CONNECTION_POOL_SIZE,
                max_overflow=DatabaseConstants.CONNECTION_POOL_OVERFLOW,
            )

        # Create tables
        Base.metadata.create_all(self.engine)

        # Session factory
        self.SessionLocal = sessionmaker(bind=self.engine)

        logger.info(f"Persistent memory initialized with database: {database_url}")

    def create_session(
        self, session_id: str, provider: str = DefaultSettings.DEFAULT_PROVIDER
    ) -> None:
        """Create a new session record."""
        with self.SessionLocal() as db:
            existing = (
                db.query(SessionRecord).filter(SessionRecord.id == session_id).first()
            )

            if not existing:
                session_record = SessionRecord(id=session_id, provider=provider)
                db.add(session_record)
                db.commit()
                logger.info(f"Created new session: {session_id}")

    def store_thought(
        self,
        session_id: str,
        thought_data: ThoughtData,
        response: str | None = None,
        processing_metadata: dict | None = None,
    ) -> int:
        """Store a thought and return its database ID."""
        with self.SessionLocal() as db:
            session_record = self._ensure_session_exists(db, session_id)
            thought_record = self._create_thought_record(
                session_id, thought_data, response, processing_metadata
            )

            db.add(thought_record)
            self._update_session_stats(session_record, processing_metadata)
            self._handle_branching(db, session_id, thought_data)

            db.commit()
            return int(thought_record.id)

    def _ensure_session_exists(self, db: Session, session_id: str) -> SessionRecord:
        """Ensure session exists in database and return it."""
        session_record = (
            db.query(SessionRecord).filter(SessionRecord.id == session_id).first()
        )

        if not session_record:
            self.create_session(session_id)
            session_record = (
                db.query(SessionRecord).filter(SessionRecord.id == session_id).first()
            )

        return session_record

    def _create_thought_record(
        self,
        session_id: str,
        thought_data: ThoughtData,
        response: str | None,
        processing_metadata: dict | None,
    ) -> ThoughtRecord:
        """Create a thought record with metadata."""
        thought_record = ThoughtRecord(
            session_id=session_id,
            thought_number=thought_data.thoughtNumber,
            thought=thought_data.thought,
            total_thoughts=thought_data.totalThoughts,
            next_needed=thought_data.nextThoughtNeeded,
            branch_from=thought_data.branchFromThought,
            branch_id=thought_data.branchId,
            response=response,
        )

        if processing_metadata:
            self._apply_processing_metadata(thought_record, processing_metadata)

        return thought_record

    def _apply_processing_metadata(
        self, thought_record: ThoughtRecord, metadata: dict
    ) -> None:
        """Apply processing metadata to thought record."""
        if strategy := metadata.get("strategy"):
            thought_record.processing_strategy = str(strategy)  # type: ignore[assignment]
        if complexity_score := metadata.get("complexity_score"):
            thought_record.complexity_score = float(complexity_score)  # type: ignore[assignment]
        if estimated_cost := metadata.get("estimated_cost"):
            thought_record.estimated_cost = float(estimated_cost)  # type: ignore[assignment]
        if actual_cost := metadata.get("actual_cost"):
            thought_record.actual_cost = float(actual_cost)  # type: ignore[assignment]
        if token_usage := metadata.get("token_usage"):
            thought_record.token_usage = int(token_usage)  # type: ignore[assignment]
        if processing_time := metadata.get("processing_time"):
            thought_record.processing_time = float(processing_time)  # type: ignore[assignment]

        thought_record.specialist_used = metadata.get("specialists", [])
        thought_record.processed_at = datetime.utcnow()  # type: ignore[assignment]

    def _update_session_stats(
        self, session_record: SessionRecord, processing_metadata: dict | None
    ) -> None:
        """Update session statistics."""
        if session_record:
            session_record.totalThoughts += 1  # type: ignore[assignment]
            session_record.updated_at = datetime.utcnow()  # type: ignore[assignment]
            if processing_metadata and processing_metadata.get("actual_cost"):
                session_record.total_cost += float(processing_metadata["actual_cost"])  # type: ignore[assignment]

    def _handle_branching(
        self, db: Session, session_id: str, thought_data: ThoughtData
    ) -> None:
        """Handle branch record creation and updates."""
        if not thought_data.branchId:
            return

        branch_record = (
            db.query(BranchRecord)
            .filter(
                BranchRecord.session_id == session_id,
                BranchRecord.branchId == thought_data.branchId,
            )
            .first()
        )

        if not branch_record:
            branch_record = BranchRecord(
                session_id=session_id,
                branch_id=thought_data.branchId,
                parent_thought=thought_data.branchFromThought,
            )
            db.add(branch_record)

        branch_record.thought_count += 1  # type: ignore[assignment]

    def get_session_thoughts(
        self, session_id: str, limit: int | None = None
    ) -> list[ThoughtRecord]:
        """Retrieve thoughts for a session."""
        with self.SessionLocal() as db:
            query = (
                db.query(ThoughtRecord)
                .filter(ThoughtRecord.session_id == session_id)
                .order_by(ThoughtRecord.thoughtNumber)
            )

            if limit:
                query = query.limit(limit)

            return query.all()

    def get_thought_by_number(
        self, session_id: str, thought_number: int
    ) -> ThoughtRecord | None:
        """Get a specific thought by number."""
        with self.SessionLocal() as db:
            return (
                db.query(ThoughtRecord)
                .filter(
                    ThoughtRecord.session_id == session_id,
                    ThoughtRecord.thoughtNumber == thought_number,
                )
                .first()
            )

    def get_branch_thoughts(
        self, session_id: str, branch_id: str
    ) -> list[ThoughtRecord]:
        """Get all thoughts in a specific branch."""
        with self.SessionLocal() as db:
            return (
                db.query(ThoughtRecord)
                .filter(
                    ThoughtRecord.session_id == session_id,
                    ThoughtRecord.branchId == branch_id,
                )
                .order_by(ThoughtRecord.thoughtNumber)
                .all()
            )

    def prune_old_sessions(
        self, older_than_days: int = 30, keep_recent: int = 100
    ) -> int:
        """Prune old sessions to manage storage space."""
        cutoff_date = datetime.utcnow() - timedelta(days=older_than_days)

        with self.SessionLocal() as db:
            # Get sessions older than cutoff, excluding most recent ones
            old_sessions = (
                db.query(SessionRecord)
                .filter(SessionRecord.updated_at < cutoff_date)
                .order_by(desc(SessionRecord.updated_at))
                .offset(keep_recent)
            )

            deleted_count = 0
            for session in old_sessions:
                db.delete(session)  # Cascade will handle thoughts and branches
                deleted_count += 1

            if deleted_count > 0:
                db.commit()
                logger.info(f"Pruned {deleted_count} old sessions")

            return deleted_count

    def get_usage_stats(self, days_back: int = 7) -> dict:
        """Get usage statistics for cost optimization."""
        cutoff_date = datetime.utcnow() - timedelta(days=days_back)

        with self.SessionLocal() as db:
            # Session stats
            session_count = (
                db.query(SessionRecord)
                .filter(SessionRecord.created_at >= cutoff_date)
                .count()
            )

            # Thought stats
            thought_stats = (
                db.query(ThoughtRecord)
                .filter(ThoughtRecord.created_at >= cutoff_date)
                .all()
            )

            total_thoughts = len(thought_stats)
            # Explicit type casting to resolve SQLAlchemy Column type issues
            total_cost: float = float(
                sum(float(t.actual_cost or 0) for t in thought_stats)
            )
            total_tokens: int = int(sum(int(t.token_usage or 0) for t in thought_stats))
            avg_processing_time = sum(
                t.processing_time or 0 for t in thought_stats
            ) / max(total_thoughts, 1)

            # Strategy breakdown
            strategy_stats = {}
            for thought in thought_stats:
                strategy = thought.processing_strategy or "unknown"
                if strategy not in strategy_stats:
                    strategy_stats[strategy] = {"count": 0, "cost": 0, "tokens": 0}
                strategy_stats[strategy]["count"] += 1
                strategy_stats[strategy]["cost"] += thought.actual_cost or 0
                strategy_stats[strategy]["tokens"] += thought.token_usage or 0

            return {
                "period_days": days_back,
                "session_count": session_count,
                "total_thoughts": total_thoughts,
                "total_cost": total_cost,
                "total_tokens": total_tokens,
                "avg_cost_per_thought": total_cost / max(total_thoughts, 1),
                "avg_tokens_per_thought": total_tokens / max(total_thoughts, 1),
                "avg_processing_time": avg_processing_time,
                "strategy_breakdown": strategy_stats,
            }

    def record_usage_metrics(
        self,
        provider: str,
        processing_strategy: str,
        complexity_level: str,
        thought_count: int = 1,
        tokens: int = 0,
        cost: float = 0.0,
        processing_time: float = 0.0,
        success: bool = True,
    ) -> None:
        """Record usage metrics for cost optimization."""
        with self.SessionLocal() as db:
            # Check if we have a record for today
            today = datetime.utcnow().date()
            existing = (
                db.query(UsageMetrics)
                .filter(
                    UsageMetrics.date >= datetime.combine(today, datetime.min.time()),
                    UsageMetrics.provider == provider,
                    UsageMetrics.processing_strategy == processing_strategy,
                    UsageMetrics.complexity_level == complexity_level,
                )
                .first()
            )

            if existing:
                # Update existing record
                existing.thought_count += thought_count  # type: ignore[assignment]
                existing.total_tokens += tokens  # type: ignore[assignment]
                existing.total_cost += cost  # type: ignore[assignment]
                existing.avg_processing_time = (  # type: ignore[assignment]
                    float(existing.avg_processing_time)
                    * (existing.thought_count - thought_count)
                    + processing_time * thought_count
                ) / existing.thought_count
                if not success:
                    existing.success_rate = (  # type: ignore[assignment]
                        float(existing.success_rate)
                        * (existing.thought_count - thought_count)
                    ) / existing.thought_count
            else:
                # Create new record
                metrics = UsageMetrics(
                    provider=provider,
                    processing_strategy=processing_strategy,
                    complexity_level=complexity_level,
                    thought_count=thought_count,
                    total_tokens=tokens,
                    total_cost=cost,
                    avg_processing_time=processing_time,
                    success_rate=1.0 if success else 0.0,
                )
                db.add(metrics)

            db.commit()

    def optimize_database(self) -> None:
        """Run database optimization tasks."""
        with self.SessionLocal() as db:
            from sqlalchemy import text

            if self.engine.dialect.name == "sqlite":
                db.execute(text("VACUUM"))
                db.execute(text("ANALYZE"))
            else:
                db.execute(text("ANALYZE"))
            db.commit()
            logger.info("Database optimization completed")

    def close(self) -> None:
        """Close database connections."""
        self.engine.dispose()


# Convenience functions
def create_persistent_memory(
    database_url: str | None = None,
) -> PersistentMemoryManager:
    """Create a persistent memory manager instance."""
    return PersistentMemoryManager(database_url)


def get_database_url_from_env() -> str:
    """Get database URL from environment variables."""
    if url := os.getenv("DATABASE_URL"):
        return url

    # Default to local SQLite
    db_dir = Path.home() / ".sequential_thinking" / "data"
    db_dir.mkdir(parents=True, exist_ok=True)
    return f"sqlite:///{db_dir}/memory.db"

```
Page 1/2FirstPrevNextLast