#
tokens: 29482/50000 6/39 files (page 2/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 2. Use http://codebase.md/fradser/mcp-server-mas-sequential-thinking?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .env.example
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CLAUDE.md
├── Dockerfile
├── pyproject.toml
├── README.md
├── README.zh-CN.md
├── smithery.yaml
├── src
│   └── mcp_server_mas_sequential_thinking
│       ├── __init__.py
│       ├── config
│       │   ├── __init__.py
│       │   ├── constants.py
│       │   └── modernized_config.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── models.py
│       │   ├── session.py
│       │   └── types.py
│       ├── infrastructure
│       │   ├── __init__.py
│       │   ├── logging_config.py
│       │   └── persistent_memory.py
│       ├── main.py
│       ├── processors
│       │   ├── __init__.py
│       │   ├── multi_thinking_core.py
│       │   └── multi_thinking_processor.py
│       ├── routing
│       │   ├── __init__.py
│       │   ├── agno_workflow_router.py
│       │   ├── ai_complexity_analyzer.py
│       │   ├── complexity_types.py
│       │   └── multi_thinking_router.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_builder.py
│       │   ├── processing_orchestrator.py
│       │   ├── response_formatter.py
│       │   ├── response_processor.py
│       │   ├── retry_handler.py
│       │   ├── server_core.py
│       │   ├── thought_processor_refactored.py
│       │   └── workflow_executor.py
│       └── utils
│           ├── __init__.py
│           └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/services/server_core.py:
--------------------------------------------------------------------------------

```python
  1 | """Refactored server core with separated concerns and reduced complexity."""
  2 | 
  3 | import os
  4 | from abc import ABC, abstractmethod
  5 | from collections.abc import AsyncIterator
  6 | from contextlib import asynccontextmanager
  7 | from dataclasses import dataclass
  8 | from pathlib import Path
  9 | from typing import Any
 10 | 
 11 | # Lazy import to break circular dependency
 12 | from pydantic import ValidationError
 13 | 
 14 | from mcp_server_mas_sequential_thinking.config import (
 15 |     DefaultTimeouts,
 16 |     DefaultValues,
 17 |     PerformanceMetrics,
 18 |     check_required_api_keys,
 19 | )
 20 | from mcp_server_mas_sequential_thinking.core import (
 21 |     ConfigurationError,
 22 |     SessionMemory,
 23 |     ThoughtData,
 24 | )
 25 | from mcp_server_mas_sequential_thinking.utils import setup_logging
 26 | 
 27 | logger = setup_logging()
 28 | 
 29 | 
 30 | class LoggingMixin:
 31 |     """Mixin class providing common logging utilities with reduced duplication."""
 32 | 
 33 |     @staticmethod
 34 |     def _log_section_header(
 35 |         title: str, separator_length: int = PerformanceMetrics.SEPARATOR_LENGTH
 36 |     ) -> None:
 37 |         """Log a formatted section header."""
 38 |         logger.info(f"{title}")
 39 | 
 40 |     @staticmethod
 41 |     def _log_metrics_block(title: str, metrics: dict[str, Any]) -> None:
 42 |         """Log a formatted metrics block."""
 43 |         logger.info(f"{title}")
 44 |         for key, value in metrics.items():
 45 |             if isinstance(value, float):
 46 |                 logger.info(f"  {key}: {value:.2f}")
 47 |             elif isinstance(value, (int, str)):
 48 |                 logger.info(f"  {key}: {value}")
 49 |             else:
 50 |                 logger.info(f"  {key}: {value}")
 51 | 
 52 |     @staticmethod
 53 |     def _log_separator(length: int = PerformanceMetrics.SEPARATOR_LENGTH) -> None:
 54 |         """Log a separator line."""
 55 |         logger.info(f"  {'=' * length}")
 56 | 
 57 |     @staticmethod
 58 |     def _calculate_efficiency_score(processing_time: float) -> float:
 59 |         """Calculate efficiency score using standard metrics."""
 60 |         return (
 61 |             PerformanceMetrics.PERFECT_EFFICIENCY_SCORE
 62 |             if processing_time < PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD
 63 |             else max(
 64 |                 PerformanceMetrics.MINIMUM_EFFICIENCY_SCORE,
 65 |                 PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD / processing_time,
 66 |             )
 67 |         )
 68 | 
 69 |     @staticmethod
 70 |     def _calculate_execution_consistency(success_indicator: bool) -> float:
 71 |         """Calculate execution consistency using standard metrics."""
 72 |         return (
 73 |             PerformanceMetrics.PERFECT_EXECUTION_CONSISTENCY
 74 |             if success_indicator
 75 |             else PerformanceMetrics.DEFAULT_EXECUTION_CONSISTENCY
 76 |         )
 77 | 
 78 | 
 79 | @dataclass(frozen=True, slots=True)
 80 | class ServerConfig:
 81 |     """Immutable server configuration with clear defaults."""
 82 | 
 83 |     provider: str
 84 |     team_mode: str = DefaultValues.DEFAULT_TEAM_MODE
 85 |     log_level: str = DefaultValues.DEFAULT_LOG_LEVEL
 86 |     max_retries: int = DefaultValues.DEFAULT_MAX_RETRIES
 87 |     timeout: float = DefaultTimeouts.PROCESSING_TIMEOUT
 88 | 
 89 |     @classmethod
 90 |     def from_environment(cls) -> "ServerConfig":
 91 |         """Create config from environment with sensible defaults."""
 92 |         return cls(
 93 |             provider=os.environ.get("LLM_PROVIDER", DefaultValues.DEFAULT_LLM_PROVIDER),
 94 |             team_mode=os.environ.get(
 95 |                 "TEAM_MODE", DefaultValues.DEFAULT_TEAM_MODE
 96 |             ).lower(),
 97 |             log_level=os.environ.get("LOG_LEVEL", DefaultValues.DEFAULT_LOG_LEVEL),
 98 |             max_retries=int(
 99 |                 os.environ.get("MAX_RETRIES", str(DefaultValues.DEFAULT_MAX_RETRIES))
100 |             ),
101 |             timeout=float(
102 |                 os.environ.get("TIMEOUT", str(DefaultValues.DEFAULT_TIMEOUT))
103 |             ),
104 |         )
105 | 
106 | 
107 | class ServerInitializer(ABC):
108 |     """Abstract initializer for server components."""
109 | 
110 |     @abstractmethod
111 |     async def initialize(self, config: ServerConfig) -> None:
112 |         """Initialize server component."""
113 | 
114 |     @abstractmethod
115 |     async def cleanup(self) -> None:
116 |         """Clean up server component."""
117 | 
118 | 
119 | class EnvironmentInitializer(ServerInitializer):
120 |     """Handles environment validation and setup."""
121 | 
122 |     async def initialize(self, config: ServerConfig) -> None:
123 |         """Validate environment requirements with enhanced error handling."""
124 |         logger.info(f"Initializing environment with {config.provider} provider")
125 | 
126 |         try:
127 |             # Graceful degradation prevents startup failures from missing optional keys
128 |             missing_keys = check_required_api_keys()
129 |             if missing_keys:
130 |                 logger.warning(f"Missing API keys: {', '.join(missing_keys)}")
131 |                 # Note: Don't fail here as some providers might not require keys
132 | 
133 |             # Ensure log directory exists
134 |             log_dir = Path.home() / ".sequential_thinking" / "logs"
135 |             if not log_dir.exists():
136 |                 logger.info(f"Creating log directory: {log_dir}")
137 |                 try:
138 |                     log_dir.mkdir(parents=True, exist_ok=True)
139 |                 except OSError as e:
140 |                     raise ConfigurationError(
141 |                         f"Failed to create log directory {log_dir}: {e}"
142 |                     ) from e
143 | 
144 |         except Exception as e:
145 |             if not isinstance(e, ConfigurationError):
146 |                 raise ConfigurationError(
147 |                     f"Environment initialization failed: {e}"
148 |                 ) from e
149 |             raise
150 | 
151 |     async def cleanup(self) -> None:
152 |         """No cleanup needed for environment."""
153 | 
154 | 
155 | class ServerState:
156 |     """Manages server state with proper lifecycle and separation of concerns."""
157 | 
158 |     def __init__(self) -> None:
159 |         self._config: ServerConfig | None = None
160 |         self._session: SessionMemory | None = None
161 |         self._initializers = [
162 |             EnvironmentInitializer(),
163 |         ]
164 | 
165 |     async def initialize(self, config: ServerConfig) -> None:
166 |         """Initialize all server components."""
167 |         self._config = config
168 | 
169 |         # Ordered initialization prevents dependency conflicts
170 |         for initializer in self._initializers:
171 |             await initializer.initialize(config)
172 | 
173 |         # Session-based architecture simplifies state management
174 |         self._session = SessionMemory()
175 | 
176 |         logger.info(
177 |             "Server state initialized successfully with multi-thinking workflow"
178 |         )
179 | 
180 |     async def cleanup(self) -> None:
181 |         """Clean up all server components."""
182 |         # Clean up in reverse order
183 |         for initializer in reversed(self._initializers):
184 |             await initializer.cleanup()
185 | 
186 |         self._config = None
187 |         self._session = None
188 | 
189 |         logger.info("Server state cleaned up")
190 | 
191 |     @property
192 |     def config(self) -> ServerConfig:
193 |         """Get current configuration."""
194 |         if self._config is None:
195 |             raise RuntimeError("Server not initialized - config unavailable")
196 |         return self._config
197 | 
198 |     @property
199 |     def session(self) -> SessionMemory:
200 |         """Get current session."""
201 |         if self._session is None:
202 |             raise RuntimeError("Server not initialized - session unavailable")
203 |         return self._session
204 | 
205 | 
206 | # Remove redundant exception definition as it's now in types.py
207 | 
208 | 
209 | class ThoughtProcessor:
210 |     """Simplified thought processor that delegates to the refactored implementation.
211 | 
212 |     This maintains backward compatibility while using the new service-based architecture.
213 |     """
214 | 
215 |     def __init__(self, session: SessionMemory) -> None:
216 |         # Import and delegate to the refactored implementation
217 |         from .thought_processor_refactored import (
218 |             ThoughtProcessor as RefactoredProcessor,
219 |         )
220 | 
221 |         self._processor = RefactoredProcessor(session)
222 | 
223 |     async def process_thought(self, thought_data: ThoughtData) -> str:
224 |         """Process a thought through the team with comprehensive error handling."""
225 |         return await self._processor.process_thought(thought_data)
226 | 
227 |     # Legacy methods for backward compatibility - delegate to refactored processor
228 |     def _extract_response_content(self, response) -> str:
229 |         """Legacy method - delegates to refactored processor."""
230 |         return self._processor._extract_response_content(response)
231 | 
232 |     def _build_context_prompt(self, thought_data: ThoughtData) -> str:
233 |         """Legacy method - delegates to refactored processor."""
234 |         return self._processor._build_context_prompt(thought_data)
235 | 
236 |     def _format_response(self, content: str, thought_data: ThoughtData) -> str:
237 |         """Legacy method - delegates to refactored processor."""
238 |         return self._processor._format_response(content, thought_data)
239 | 
240 |     async def _execute_single_agent_processing(
241 |         self, input_prompt: str, routing_decision=None
242 |     ) -> str:
243 |         """Legacy method - delegates to refactored processor."""
244 |         return await self._processor._execute_single_agent_processing(
245 |             input_prompt, routing_decision
246 |         )
247 | 
248 |     async def _execute_team_processing(self, input_prompt: str) -> str:
249 |         """Legacy method - delegates to refactored processor."""
250 |         return await self._processor._execute_team_processing(input_prompt)
251 | 
252 | 
253 | @asynccontextmanager
254 | async def create_server_lifespan() -> AsyncIterator[ServerState]:
255 |     """Create server lifespan context manager with proper resource management."""
256 |     config = ServerConfig.from_environment()
257 |     server_state = ServerState()
258 | 
259 |     try:
260 |         await server_state.initialize(config)
261 |         logger.info("Server started successfully")
262 |         yield server_state
263 | 
264 |     except Exception as e:
265 |         logger.error(f"Server initialization failed: {e}", exc_info=True)
266 |         raise ServerInitializationError(f"Failed to initialize server: {e}") from e
267 | 
268 |     finally:
269 |         await server_state.cleanup()
270 |         logger.info("Server shutdown complete")
271 | 
272 | 
273 | class ServerInitializationError(Exception):
274 |     """Custom exception for server initialization failures."""
275 | 
276 | 
277 | def create_validated_thought_data(
278 |     thought: str,
279 |     thoughtNumber: int,
280 |     totalThoughts: int,
281 |     nextThoughtNeeded: bool,
282 |     isRevision: bool,
283 |     branchFromThought: int | None,
284 |     branchId: str | None,
285 |     needsMoreThoughts: bool,
286 | ) -> ThoughtData:
287 |     """Create and validate thought data with enhanced error reporting."""
288 |     try:
289 |         return ThoughtData(
290 |             thought=thought.strip(),
291 |             thoughtNumber=thoughtNumber,
292 |             totalThoughts=totalThoughts,
293 |             nextThoughtNeeded=nextThoughtNeeded,
294 |             isRevision=isRevision,
295 |             branchFromThought=branchFromThought,
296 |             branchId=branchId.strip() if branchId else None,
297 |             needsMoreThoughts=needsMoreThoughts,
298 |         )
299 |     except ValidationError as e:
300 |         raise ValueError(f"Invalid thought data: {e}") from e
301 | 
302 | 
303 | # Global server state with workflow support
304 | _server_state: ServerState | None = None
305 | _thought_processor: ThoughtProcessor | None = None
306 | 
307 | 
308 | async def get_thought_processor() -> ThoughtProcessor:
309 |     """Get or create the global thought processor with workflow support."""
310 |     global _thought_processor, _server_state
311 | 
312 |     if _thought_processor is None:
313 |         if _server_state is None:
314 |             raise RuntimeError(
315 |                 "Server not properly initialized - _server_state is None. Ensure app_lifespan is running."
316 |             )
317 | 
318 |         logger.info("Initializing ThoughtProcessor with multi-thinking workflow")
319 |         _thought_processor = ThoughtProcessor(_server_state.session)
320 | 
321 |     return _thought_processor
322 | 
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/config/modernized_config.py:
--------------------------------------------------------------------------------

```python
  1 | """Modernized configuration management with dependency injection and clean abstractions.
  2 | 
  3 | Clean configuration management with modern Python patterns.
  4 | """
  5 | 
  6 | import os
  7 | from abc import ABC, abstractmethod
  8 | from dataclasses import dataclass
  9 | from typing import Any, Protocol, runtime_checkable
 10 | 
 11 | from agno.models.anthropic import Claude
 12 | from agno.models.base import Model
 13 | from agno.models.deepseek import DeepSeek
 14 | from agno.models.groq import Groq
 15 | from agno.models.ollama import Ollama
 16 | from agno.models.openai import OpenAIChat
 17 | from agno.models.openrouter import OpenRouter
 18 | 
 19 | 
 20 | class GitHubOpenAI(OpenAIChat):
 21 |     """OpenAI provider configured for GitHub Models API with enhanced validation."""
 22 | 
 23 |     @staticmethod
 24 |     def _validate_github_token(token: str) -> None:
 25 |         """Validate GitHub token format with comprehensive security checks."""
 26 |         if not token:
 27 |             raise ValueError("GitHub token is required but not provided")
 28 | 
 29 |         # Strip whitespace and validate basic format
 30 |         token = token.strip()
 31 | 
 32 |         # Valid GitHub token prefixes with their expected lengths
 33 |         token_specs = {
 34 |             "ghp_": 40,  # Classic personal access token
 35 |             "github_pat_": lambda t: len(t) >= 93,  # Fine-grained PAT (variable length)
 36 |             "gho_": 40,  # OAuth app token
 37 |             "ghu_": 40,  # User-to-server token
 38 |             "ghs_": 40,  # Server-to-server token
 39 |         }
 40 | 
 41 |         # Check token prefix and length
 42 |         valid_token = False
 43 |         for prefix, expected_length in token_specs.items():
 44 |             if token.startswith(prefix):
 45 |                 if callable(expected_length):
 46 |                     if expected_length(token):
 47 |                         valid_token = True
 48 |                         break
 49 |                 elif len(token) == expected_length:
 50 |                     valid_token = True
 51 |                     break
 52 | 
 53 |         if not valid_token:
 54 |             raise ValueError(
 55 |                 "Invalid GitHub token format. Token must be a valid GitHub personal "
 56 |                 "access token, "
 57 |                 "OAuth token, or fine-grained personal access token with correct "
 58 |                 "prefix and length."
 59 |             )
 60 | 
 61 |         # Enhanced entropy validation to prevent fake tokens
 62 |         token_body = (
 63 |             token[4:]
 64 |             if token.startswith("ghp_")
 65 |             else token.split("_", 1)[1]
 66 |             if "_" in token
 67 |             else token
 68 |         )
 69 | 
 70 |         # Check for minimum entropy (character diversity)
 71 |         unique_chars = len(set(token_body.lower()))
 72 |         if unique_chars < 15:  # GitHub tokens should have high entropy
 73 |             raise ValueError(
 74 |                 "GitHub token appears to have insufficient entropy. Please ensure "
 75 |                 "you're using a real GitHub token."
 76 |             )
 77 | 
 78 |         # Check for obvious fake patterns
 79 |         fake_patterns = ["test", "fake", "demo", "example", "placeholder", "your_token"]
 80 |         if any(pattern in token.lower() for pattern in fake_patterns):
 81 |             raise ValueError(
 82 |                 "GitHub token appears to be a placeholder or test value. Please "
 83 |                 "use a real GitHub token."
 84 |             )
 85 | 
 86 |     def __init__(self, **kwargs: Any) -> None:
 87 |         # Set GitHub Models configuration
 88 |         kwargs.setdefault("base_url", "https://models.github.ai/inference")
 89 | 
 90 |         if "api_key" not in kwargs:
 91 |             kwargs["api_key"] = os.environ.get("GITHUB_TOKEN")
 92 | 
 93 |         api_key = kwargs.get("api_key")
 94 |         if not api_key:
 95 |             raise ValueError(
 96 |                 "GitHub token is required but not found in GITHUB_TOKEN "
 97 |                 "environment variable"
 98 |             )
 99 | 
100 |         if isinstance(api_key, str):
101 |             self._validate_github_token(api_key)
102 |         super().__init__(**kwargs)
103 | 
104 | 
105 | @dataclass(frozen=True)
106 | class ModelConfig:
107 |     """Immutable configuration for model provider and settings."""
108 | 
109 |     provider_class: type[Model]
110 |     enhanced_model_id: str
111 |     standard_model_id: str
112 |     api_key: str | None = None
113 | 
114 |     def create_enhanced_model(self) -> Model:
115 |         """Create enhanced model instance (used for complex synthesis like Blue Hat)."""
116 |         # Enable prompt caching for Anthropic models
117 |         if self.provider_class == Claude:
118 |             return self.provider_class(
119 |                 id=self.enhanced_model_id,
120 |                 # Note: cache_system_prompt removed - not available in current Agno version
121 |             )
122 |         return self.provider_class(id=self.enhanced_model_id)
123 | 
124 |     def create_standard_model(self) -> Model:
125 |         """Create standard model instance (used for individual hat processing)."""
126 |         # Enable prompt caching for Anthropic models
127 |         if self.provider_class == Claude:
128 |             return self.provider_class(
129 |                 id=self.standard_model_id,
130 |                 # Note: cache_system_prompt removed - not available in current Agno version
131 |             )
132 |         return self.provider_class(id=self.standard_model_id)
133 | 
134 | 
135 | @runtime_checkable
136 | class ConfigurationStrategy(Protocol):
137 |     """Protocol defining configuration strategy interface."""
138 | 
139 |     def get_config(self) -> ModelConfig:
140 |         """Return model configuration for this strategy."""
141 |         ...
142 | 
143 |     def get_required_environment_variables(self) -> dict[str, bool]:
144 |         """Return required environment variables and whether they're optional."""
145 |         ...
146 | 
147 | 
148 | class BaseProviderStrategy(ABC):
149 |     """Abstract base strategy with common functionality."""
150 | 
151 |     @property
152 |     @abstractmethod
153 |     def provider_class(self) -> type[Model]:
154 |         """Return the provider model class."""
155 | 
156 |     @property
157 |     @abstractmethod
158 |     def default_enhanced_model(self) -> str:
159 |         """Return default enhanced model ID (for complex synthesis)."""
160 | 
161 |     @property
162 |     @abstractmethod
163 |     def default_standard_model(self) -> str:
164 |         """Return default standard model ID (for individual processing)."""
165 | 
166 |     @property
167 |     @abstractmethod
168 |     def api_key_name(self) -> str | None:
169 |         """Return API key environment variable name."""
170 | 
171 |     def _get_env_with_fallback(self, env_var: str, fallback: str) -> str:
172 |         """Get environment variable with fallback to default."""
173 |         value = os.environ.get(env_var, "").strip()
174 |         return value if value else fallback
175 | 
176 |     def get_config(self) -> ModelConfig:
177 |         """Get complete configuration with environment overrides."""
178 |         prefix = self.__class__.__name__.replace("Strategy", "").upper()
179 | 
180 |         enhanced_model = self._get_env_with_fallback(
181 |             f"{prefix}_ENHANCED_MODEL_ID", self.default_enhanced_model
182 |         )
183 |         standard_model = self._get_env_with_fallback(
184 |             f"{prefix}_STANDARD_MODEL_ID", self.default_standard_model
185 |         )
186 | 
187 |         # Get API key with enhanced validation and None handling
188 |         api_key: str | None = None
189 |         if self.api_key_name:
190 |             api_key = os.environ.get(self.api_key_name, "").strip()
191 |             api_key = api_key if api_key else None
192 | 
193 |         return ModelConfig(
194 |             provider_class=self.provider_class,
195 |             enhanced_model_id=enhanced_model,
196 |             standard_model_id=standard_model,
197 |             api_key=api_key,
198 |         )
199 | 
200 |     def get_required_environment_variables(self) -> dict[str, bool]:
201 |         """Return required environment variables."""
202 |         env_vars: dict[str, bool] = {}
203 | 
204 |         if self.api_key_name:
205 |             env_vars[self.api_key_name] = False  # Required
206 | 
207 |         # Enhanced/standard environment variables (optional)
208 |         prefix = self.__class__.__name__.replace("Strategy", "").upper()
209 |         env_vars[f"{prefix}_ENHANCED_MODEL_ID"] = True  # Optional
210 |         env_vars[f"{prefix}_STANDARD_MODEL_ID"] = True  # Optional
211 | 
212 |         return env_vars
213 | 
214 | 
215 | # Concrete strategy implementations
216 | class DeepSeekStrategy(BaseProviderStrategy):
217 |     """DeepSeek provider strategy."""
218 | 
219 |     provider_class = DeepSeek
220 |     default_enhanced_model = "deepseek-chat"
221 |     default_standard_model = "deepseek-chat"
222 |     api_key_name = "DEEPSEEK_API_KEY"
223 | 
224 | 
225 | class GroqStrategy(BaseProviderStrategy):
226 |     """Groq provider strategy."""
227 | 
228 |     provider_class = Groq
229 |     default_enhanced_model = "openai/gpt-oss-120b"
230 |     default_standard_model = "openai/gpt-oss-20b"
231 |     api_key_name = "GROQ_API_KEY"
232 | 
233 | 
234 | class OpenRouterStrategy(BaseProviderStrategy):
235 |     """OpenRouter provider strategy."""
236 | 
237 |     provider_class = OpenRouter
238 |     default_enhanced_model = "deepseek/deepseek-chat-v3-0324"
239 |     default_standard_model = "deepseek/deepseek-r1"
240 |     api_key_name = "OPENROUTER_API_KEY"
241 | 
242 | 
243 | class OllamaStrategy(BaseProviderStrategy):
244 |     """Ollama provider strategy (no API key required)."""
245 | 
246 |     provider_class = Ollama
247 |     default_enhanced_model = "devstral:24b"
248 |     default_standard_model = "devstral:24b"
249 |     api_key_name = None
250 | 
251 | 
252 | class GitHubStrategy(BaseProviderStrategy):
253 |     """GitHub Models provider strategy."""
254 | 
255 |     @property
256 |     def provider_class(self) -> type[Model]:
257 |         return GitHubOpenAI
258 | 
259 |     @property
260 |     def default_enhanced_model(self) -> str:
261 |         return "openai/gpt-5"
262 | 
263 |     @property
264 |     def default_standard_model(self) -> str:
265 |         return "openai/gpt-5-min"
266 | 
267 |     @property
268 |     def api_key_name(self) -> str:
269 |         return "GITHUB_TOKEN"
270 | 
271 | 
272 | class AnthropicStrategy(BaseProviderStrategy):
273 |     """Anthropic Claude provider strategy with prompt caching enabled."""
274 | 
275 |     @property
276 |     def provider_class(self) -> type[Model]:
277 |         return Claude
278 | 
279 |     @property
280 |     def default_enhanced_model(self) -> str:
281 |         return "claude-3-5-sonnet-20241022"
282 | 
283 |     @property
284 |     def default_standard_model(self) -> str:
285 |         return "claude-3-5-haiku-20241022"
286 | 
287 |     @property
288 |     def api_key_name(self) -> str:
289 |         return "ANTHROPIC_API_KEY"
290 | 
291 | 
292 | class ConfigurationManager:
293 |     """Manages configuration strategies with dependency injection."""
294 | 
295 |     def __init__(self) -> None:
296 |         self._strategies = {
297 |             "deepseek": DeepSeekStrategy(),
298 |             "groq": GroqStrategy(),
299 |             "openrouter": OpenRouterStrategy(),
300 |             "ollama": OllamaStrategy(),
301 |             "github": GitHubStrategy(),
302 |             "anthropic": AnthropicStrategy(),
303 |         }
304 |         self._default_strategy = "deepseek"
305 | 
306 |     def register_strategy(self, name: str, strategy: ConfigurationStrategy) -> None:
307 |         """Register a new configuration strategy."""
308 |         self._strategies[name] = strategy
309 | 
310 |     def get_strategy(self, provider_name: str | None = None) -> ConfigurationStrategy:
311 |         """Get strategy for specified provider."""
312 |         if provider_name is None:
313 |             provider_name = os.environ.get("LLM_PROVIDER", self._default_strategy)
314 | 
315 |         provider_name = provider_name.lower()
316 | 
317 |         if provider_name not in self._strategies:
318 |             available = list(self._strategies.keys())
319 |             raise ValueError(
320 |                 f"Unknown provider: {provider_name}. Available: {available}"
321 |             )
322 | 
323 |         return self._strategies[provider_name]
324 | 
325 |     def get_model_config(self, provider_name: str | None = None) -> ModelConfig:
326 |         """Get model configuration using dependency injection."""
327 |         strategy = self.get_strategy(provider_name)
328 |         return strategy.get_config()
329 | 
330 |     def validate_environment(self, provider_name: str | None = None) -> dict[str, str]:
331 |         """Validate environment variables and return missing required ones."""
332 |         strategy = self.get_strategy(provider_name)
333 |         required_vars = strategy.get_required_environment_variables()
334 | 
335 |         missing: dict[str, str] = {}
336 |         for var_name, is_optional in required_vars.items():
337 |             if not is_optional and not os.environ.get(var_name):
338 |                 missing[var_name] = "Required but not set"
339 | 
340 |         # Check EXA API key for research functionality (optional)
341 |         # Note: EXA tools will be disabled if key is not provided
342 |         exa_key = os.environ.get("EXA_API_KEY")
343 |         if not exa_key:
344 |             # Don't fail startup - just log warning that research will be disabled
345 |             import logging
346 | 
347 |             logging.getLogger(__name__).warning(
348 |                 "EXA_API_KEY not found. Research tools will be disabled."
349 |             )
350 | 
351 |         return missing
352 | 
353 |     def get_available_providers(self) -> list[str]:
354 |         """Get list of available provider names."""
355 |         return list(self._strategies.keys())
356 | 
357 | 
358 | # Singleton instance for dependency injection
359 | _config_manager = ConfigurationManager()
360 | 
361 | 
362 | # Public API functions
363 | def get_model_config(provider_name: str | None = None) -> ModelConfig:
364 |     """Get model configuration using modernized configuration management."""
365 |     return _config_manager.get_model_config(provider_name)
366 | 
367 | 
368 | def check_required_api_keys(provider_name: str | None = None) -> list[str]:
369 |     """Check for missing required API keys."""
370 |     missing_vars = _config_manager.validate_environment(provider_name)
371 |     return list(missing_vars.keys())
372 | 
373 | 
374 | def register_provider_strategy(name: str, strategy: ConfigurationStrategy) -> None:
375 |     """Register a custom provider strategy."""
376 |     _config_manager.register_strategy(name, strategy)
377 | 
378 | 
379 | def get_available_providers() -> list[str]:
380 |     """Get list of available providers."""
381 |     return _config_manager.get_available_providers()
382 | 
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/routing/agno_workflow_router.py:
--------------------------------------------------------------------------------

```python
  1 | """Multi-Thinking Workflow Router - Complete Rewrite.
  2 | 
  3 | 纯净的多向思维工作流实现, 基于Agno v2.0框架。
  4 | 完全移除旧的复杂度路由, 专注于多向思维方法论。
  5 | """
  6 | 
  7 | import logging
  8 | import re
  9 | import time
 10 | from dataclasses import dataclass
 11 | from typing import Any
 12 | 
 13 | from agno.workflow.router import Router
 14 | from agno.workflow.step import Step
 15 | from agno.workflow.types import StepInput, StepOutput
 16 | from agno.workflow.workflow import Workflow
 17 | 
 18 | from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config
 19 | 
 20 | # Import at module level - moved from function to avoid PLC0415
 21 | from mcp_server_mas_sequential_thinking.core.models import ThoughtData
 22 | from mcp_server_mas_sequential_thinking.processors.multi_thinking_processor import (
 23 |     MultiThinkingSequentialProcessor,
 24 |     create_multi_thinking_step_output,
 25 | )
 26 | 
 27 | logger = logging.getLogger(__name__)
 28 | 
 29 | 
 30 | @dataclass
 31 | class MultiThinkingWorkflowResult:
 32 |     """Result from Multi-Thinking workflow execution."""
 33 | 
 34 |     content: str
 35 |     strategy_used: str
 36 |     processing_time: float
 37 |     complexity_score: float
 38 |     step_name: str
 39 |     thinking_sequence: list[str]
 40 |     cost_reduction: float
 41 | 
 42 | 
 43 | class MultiThinkingWorkflowRouter:
 44 |     """Pure Multi-Thinking workflow router using Agno v2.0."""
 45 | 
 46 |     def __init__(self) -> None:
 47 |         """Initialize Multi-Thinking workflow router."""
 48 |         self.model_config = get_model_config()
 49 | 
 50 |         # Initialize Multi-Thinking processor
 51 |         self.multi_thinking_processor = MultiThinkingSequentialProcessor()
 52 | 
 53 |         # Create Multi-Thinking processing step
 54 |         self.multi_thinking_step = self._create_multi_thinking_step()
 55 | 
 56 |         # Create router that always selects Multi-Thinking
 57 |         self.router = Router(
 58 |             name="multi_thinking_router",
 59 |             selector=self._multi_thinking_selector,
 60 |             choices=[self.multi_thinking_step],
 61 |         )
 62 | 
 63 |         # Create main workflow
 64 |         self.workflow = Workflow(
 65 |             name="multi_thinking_workflow",
 66 |             steps=[self.router],
 67 |         )
 68 | 
 69 |         logger.info("Multi-Thinking Workflow Router initialized")
 70 | 
 71 |     def _multi_thinking_selector(self, step_input: StepInput) -> list[Step]:
 72 |         """Selector that always returns Multi-Thinking processing."""
 73 |         try:
 74 |             logger.info("🎩 MULTI-THINKING WORKFLOW ROUTING:")
 75 | 
 76 |             # Extract thought content for logging
 77 |             if isinstance(step_input.input, dict):
 78 |                 thought_content = step_input.input.get("thought", "")
 79 |                 thought_number = step_input.input.get("thought_number", 1)
 80 |                 total_thoughts = step_input.input.get("total_thoughts", 1)
 81 |             else:
 82 |                 thought_content = str(step_input.input)
 83 |                 thought_number = 1
 84 |                 total_thoughts = 1
 85 | 
 86 |             logger.info(
 87 |                 "  📝 Input: %s%s",
 88 |                 thought_content[:100],
 89 |                 "..." if len(thought_content) > 100 else "",
 90 |             )
 91 |             logger.info("  🔢 Progress: %s/%s", thought_number, total_thoughts)
 92 |             logger.info("  ✅ Multi-Thinking selected - exclusive thinking methodology")
 93 | 
 94 |             return [self.multi_thinking_step]
 95 | 
 96 |         except Exception as e:
 97 |             logger.exception("Error in Multi-Thinking selector: %s", e)
 98 |             logger.warning("Continuing with Multi-Thinking processing despite error")
 99 |             return [self.multi_thinking_step]
100 | 
101 |     def _create_multi_thinking_step(self) -> Step:
102 |         """Create Six Thinking Hats processing step."""
103 | 
104 |         async def multi_thinking_executor(
105 |             step_input: StepInput, session_state: dict[str, Any]
106 |         ) -> StepOutput:
107 |             """Execute Multi-Thinking thinking process."""
108 |             try:
109 |                 logger.info("🎩 MULTI-THINKING STEP EXECUTION:")
110 | 
111 |                 # Extract thought content and metadata
112 |                 if isinstance(step_input.input, dict):
113 |                     thought_content = step_input.input.get(
114 |                         "thought", str(step_input.input)
115 |                     )
116 |                     thought_number = step_input.input.get("thought_number", 1)
117 |                     total_thoughts = step_input.input.get("total_thoughts", 1)
118 |                     context = step_input.input.get("context", "")
119 |                 else:
120 |                     thought_content = str(step_input.input)
121 |                     thought_number = 1
122 |                     total_thoughts = 1
123 |                     context = ""
124 | 
125 |                 # ThoughtData is now imported at module level
126 | 
127 |                 # Extract context preservation fields from input if available
128 |                 if isinstance(step_input.input, dict):
129 |                     is_revision = step_input.input.get("isRevision", False)
130 |                     branch_from_thought = step_input.input.get("branchFromThought")
131 |                     branch_id = step_input.input.get("branchId")
132 |                     needs_more_thoughts = step_input.input.get(
133 |                         "needsMoreThoughts", False
134 |                     )
135 |                     next_thought_needed = step_input.input.get(
136 |                         "nextThoughtNeeded", True
137 |                     )
138 |                 else:
139 |                     is_revision = False
140 |                     branch_from_thought = None
141 |                     branch_id = None
142 |                     needs_more_thoughts = False
143 |                     next_thought_needed = True
144 | 
145 |                 thought_data = ThoughtData(
146 |                     thought=thought_content,
147 |                     thoughtNumber=thought_number,
148 |                     totalThoughts=total_thoughts,
149 |                     nextThoughtNeeded=next_thought_needed,
150 |                     isRevision=is_revision,
151 |                     branchFromThought=branch_from_thought,
152 |                     branchId=branch_id,
153 |                     needsMoreThoughts=needs_more_thoughts,
154 |                 )
155 | 
156 |                 logger.info("  📝 Input: %s...", thought_content[:100])
157 |                 logger.info("  🔢 Thought: %s/%s", thought_number, total_thoughts)
158 | 
159 |                 # Process with Multi-Thinking
160 |                 result = (
161 |                     await self.multi_thinking_processor.process_with_multi_thinking(
162 |                         thought_data, context
163 |                     )
164 |                 )
165 | 
166 |                 # Store metadata in session_state
167 |                 session_state["current_strategy"] = result.strategy_used
168 |                 session_state["current_complexity_score"] = result.complexity_score
169 |                 session_state["thinking_sequence"] = result.thinking_sequence
170 |                 session_state["cost_reduction"] = result.cost_reduction
171 | 
172 |                 logger.info("  ✅ Multi-Thinking completed: %s", result.strategy_used)
173 |                 logger.info("  📊 Complexity: %.1f", result.complexity_score)
174 |                 logger.info("  💰 Cost Reduction: %.1f%%", result.cost_reduction)
175 | 
176 |                 return create_multi_thinking_step_output(result)
177 | 
178 |             except Exception as e:
179 |                 logger.exception("  ❌ Multi-Thinking execution failed")
180 |                 return StepOutput(
181 |                     content=f"Multi-Thinking processing failed: {e!s}",
182 |                     success=False,
183 |                     error=str(e),
184 |                     step_name="multi_thinking_error",
185 |                 )
186 | 
187 |         return Step(
188 |             name="multi_thinking_processing",
189 |             executor=multi_thinking_executor,
190 |             description="Six Thinking Hats sequential processing",
191 |         )
192 | 
193 |     async def process_thought_workflow(
194 |         self, thought_data: "ThoughtData", context_prompt: str
195 |     ) -> MultiThinkingWorkflowResult:
196 |         """Process thought using Multi-Thinking workflow."""
197 |         start_time = time.time()
198 | 
199 |         try:
200 |             logger.info("🚀 MULTI-THINKING WORKFLOW INITIALIZATION:")
201 |             logger.info(
202 |                 "  📝 Thought: %s%s",
203 |                 thought_data.thought[:100],
204 |                 "..." if len(thought_data.thought) > 100 else "",
205 |             )
206 |             logger.info(
207 |                 "  🔢 Thought Number: %s/%s",
208 |                 thought_data.thoughtNumber,
209 |                 thought_data.totalThoughts,
210 |             )
211 |             logger.info("  📋 Context Length: %d chars", len(context_prompt))
212 |             logger.info(
213 |                 "  ⏰ Start Time: %s",
214 |                 time.strftime("%H:%M:%S", time.localtime(start_time)),
215 |             )
216 | 
217 |             # Prepare workflow input for Multi-Thinking
218 |             workflow_input = {
219 |                 "thought": thought_data.thought,
220 |                 "thought_number": thought_data.thoughtNumber,
221 |                 "total_thoughts": thought_data.totalThoughts,
222 |                 "context": context_prompt,
223 |             }
224 | 
225 |             logger.info("📦 WORKFLOW INPUT PREPARATION:")
226 |             logger.info("  📊 Input Keys: %s", list(workflow_input.keys()))
227 |             logger.info("  📏 Input Size: %d chars", len(str(workflow_input)))
228 | 
229 |             # Initialize session_state for metadata tracking
230 |             session_state: dict[str, Any] = {
231 |                 "start_time": start_time,
232 |                 "thought_number": thought_data.thoughtNumber,
233 |                 "total_thoughts": thought_data.totalThoughts,
234 |             }
235 | 
236 |             logger.info("🎯 SESSION STATE SETUP:")
237 |             logger.info("  🔑 State Keys: %s", list(session_state.keys()))
238 |             logger.info("  📈 Metadata: %s", session_state)
239 | 
240 |             logger.info(
241 |                 "▶️  EXECUTING Multi-Thinking workflow for thought #%s",
242 |                 thought_data.thoughtNumber,
243 |             )
244 | 
245 |             # Execute Multi-Thinking workflow
246 |             logger.info("🔄 WORKFLOW EXECUTION START...")
247 |             result = await self.workflow.arun(
248 |                 input=workflow_input, session_state=session_state
249 |             )
250 |             logger.info("✅ WORKFLOW EXECUTION COMPLETED")
251 | 
252 |             processing_time = time.time() - start_time
253 | 
254 |             # Extract clean content from result
255 |             content = self._extract_clean_content(result)
256 | 
257 |             logger.info("📋 CONTENT VALIDATION:")
258 |             logger.info(
259 |                 "  ✅ Content extracted successfully: %d characters", len(content)
260 |             )
261 |             logger.info(
262 |                 "  📝 Content preview: %s%s",
263 |                 content[:150],
264 |                 "..." if len(content) > 150 else "",
265 |             )
266 | 
267 |             # Get metadata from session_state
268 |             complexity_score = session_state.get("current_complexity_score", 0.0)
269 |             strategy_used = session_state.get("current_strategy", "multi_thinking")
270 |             thinking_sequence = session_state.get("thinking_sequence", [])
271 |             cost_reduction = session_state.get("cost_reduction", 0.0)
272 | 
273 |             logger.info("📊 WORKFLOW RESULT COMPILATION:")
274 |             logger.info("  🎯 Strategy used: %s", strategy_used)
275 |             logger.info("  🧠 Thinking sequence: %s", " → ".join(thinking_sequence))
276 |             logger.info("  📈 Complexity score: %.1f", complexity_score)
277 |             logger.info("  💰 Cost reduction: %.1f%%", cost_reduction)
278 |             logger.info("  ⏱️  Processing time: %.3fs", processing_time)
279 | 
280 |             workflow_result = MultiThinkingWorkflowResult(
281 |                 content=content,
282 |                 strategy_used=strategy_used,
283 |                 processing_time=processing_time,
284 |                 complexity_score=complexity_score,
285 |                 step_name="multi_thinking_execution",
286 |                 thinking_sequence=thinking_sequence,
287 |                 cost_reduction=cost_reduction,
288 |             )
289 | 
290 |             logger.info("🎉 MULTI-THINKING WORKFLOW COMPLETION:")
291 |             logger.info(
292 |                 "  ✅ Completed: strategy=%s, time=%.3fs, score=%.1f, reduction=%.1f%%",
293 |                 strategy_used,
294 |                 processing_time,
295 |                 complexity_score,
296 |                 cost_reduction,
297 |             )
298 | 
299 |             return workflow_result
300 | 
301 |         except Exception as e:
302 |             processing_time = time.time() - start_time
303 |             logger.exception(
304 |                 "Multi-Thinking workflow execution failed after %.3fs", processing_time
305 |             )
306 | 
307 |             return MultiThinkingWorkflowResult(
308 |                 content=f"Error processing thought with Multi-Thinking: {e!s}",
309 |                 strategy_used="error_fallback",
310 |                 processing_time=processing_time,
311 |                 complexity_score=0.0,
312 |                 step_name="error_handling",
313 |                 thinking_sequence=[],
314 |                 cost_reduction=0.0,
315 |             )
316 | 
317 |     def _extract_clean_content(self, result: Any) -> str:
318 |         """Extract clean content from workflow result."""
319 | 
320 |         def extract_recursive(obj: Any, depth: int = 0) -> str:
321 |             """Recursively extract clean content from nested objects."""
322 |             if depth > 10:  # Prevent infinite recursion
323 |                 return str(obj)
324 | 
325 |             # Handle dictionary with common content keys
326 |             if isinstance(obj, dict):
327 |                 for key in [
328 |                     "result",
329 |                     "content",
330 |                     "message",
331 |                     "text",
332 |                     "response",
333 |                     "output",
334 |                     "answer",
335 |                 ]:
336 |                     if key in obj:
337 |                         return extract_recursive(obj[key], depth + 1)
338 |                 # Fallback to any string content
339 |                 for value in obj.values():
340 |                     if isinstance(value, str) and len(value.strip()) > 10:
341 |                         return value.strip()
342 |                 return str(obj)
343 | 
344 |             # Handle objects with content attributes
345 |             if hasattr(obj, "content"):
346 |                 content = obj.content
347 |                 if isinstance(content, str):
348 |                     return content.strip()
349 |                 return extract_recursive(content, depth + 1)
350 | 
351 |             # Handle other output objects
352 |             if hasattr(obj, "output"):
353 |                 return extract_recursive(obj.output, depth + 1)
354 | 
355 |             # Handle list/tuple - extract first meaningful content
356 |             if isinstance(obj, (list, tuple)) and obj:
357 |                 for item in obj:
358 |                     result = extract_recursive(item, depth + 1)
359 |                     if isinstance(result, str) and len(result.strip()) > 10:
360 |                         return result.strip()
361 | 
362 |             # If it's already a string, clean it up
363 |             if isinstance(obj, str):
364 |                 content = obj.strip()
365 | 
366 |                 # Remove object representations
367 |                 if any(
368 |                     content.startswith(pattern)
369 |                     for pattern in [
370 |                         "RunOutput(",
371 |                         "TeamRunOutput(",
372 |                         "StepOutput(",
373 |                         "WorkflowResult(",
374 |                         "{'result':",
375 |                         '{"result":',
376 |                         "{'content':",
377 |                         '{"content":',
378 |                     ]
379 |                 ):
380 |                     # Try to extract content using regex (re imported at module level)
381 | 
382 |                     patterns = [
383 |                         (r"content='([^']*)'", 1),
384 |                         (r'content="([^"]*)"', 1),
385 |                         (r"'result':\s*'([^']*)'", 1),
386 |                         (r'"result":\s*"([^"]*)"', 1),
387 |                         (r"'([^']{20,})'", 1),
388 |                         (r'"([^"]{20,})"', 1),
389 |                     ]
390 | 
391 |                     for pattern, group in patterns:
392 |                         match = re.search(pattern, content)
393 |                         if match:
394 |                             extracted = match.group(group).strip()
395 |                             if len(extracted) > 10:
396 |                                 return extracted
397 | 
398 |                     # Clean up object syntax
399 |                     cleaned = re.sub(r'[{}()"\']', " ", content)
400 |                     cleaned = re.sub(
401 |                         r"\b(RunOutput|TeamRunOutput|StepOutput|content|result|success|error)\b",
402 |                         " ",
403 |                         cleaned,
404 |                     )
405 |                     cleaned = re.sub(r"\s+", " ", cleaned).strip()
406 | 
407 |                     if len(cleaned) > 20:
408 |                         return cleaned
409 | 
410 |                 return content
411 | 
412 |             # Fallback
413 |             result = str(obj).strip()
414 |             if len(result) > 20 and not any(
415 |                 result.startswith(pattern)
416 |                 for pattern in ["RunOutput(", "TeamRunOutput(", "StepOutput(", "<"]
417 |             ):
418 |                 return result
419 | 
420 |             return "Multi-Thinking processing completed successfully"
421 | 
422 |         return extract_recursive(result)
423 | 
424 | 
425 | # For backward compatibility with the old AgnoWorkflowRouter name
426 | AgnoWorkflowRouter = MultiThinkingWorkflowRouter
427 | WorkflowResult = MultiThinkingWorkflowResult
428 | 
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/infrastructure/persistent_memory.py:
--------------------------------------------------------------------------------

```python
  1 | """Persistent memory management with SQLAlchemy and memory pruning."""
  2 | 
  3 | import os
  4 | from datetime import datetime, timedelta
  5 | from pathlib import Path
  6 | 
  7 | from sqlalchemy import (
  8 |     JSON,
  9 |     Boolean,
 10 |     Column,
 11 |     DateTime,
 12 |     Float,
 13 |     ForeignKey,
 14 |     Index,
 15 |     Integer,
 16 |     String,
 17 |     Text,
 18 |     create_engine,
 19 |     desc,
 20 | )
 21 | from sqlalchemy.orm import DeclarativeBase, Session, relationship, sessionmaker
 22 | from sqlalchemy.pool import StaticPool
 23 | 
 24 | from mcp_server_mas_sequential_thinking.config.constants import (
 25 |     DatabaseConstants,
 26 |     DefaultSettings,
 27 | )
 28 | from mcp_server_mas_sequential_thinking.core.models import ThoughtData
 29 | 
 30 | from .logging_config import get_logger
 31 | 
 32 | logger = get_logger(__name__)
 33 | 
 34 | 
 35 | class Base(DeclarativeBase):
 36 |     """Base class for SQLAlchemy models with modern typing support."""
 37 | 
 38 | 
 39 | class SessionRecord(Base):
 40 |     """Database model for session storage."""
 41 | 
 42 |     __tablename__ = "sessions"
 43 | 
 44 |     id = Column(String, primary_key=True)
 45 |     created_at = Column(DateTime, default=datetime.utcnow)
 46 |     updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
 47 |     total_thoughts = Column(Integer, default=0)
 48 |     total_cost = Column(Float, default=0.0)
 49 |     provider = Column(String, default="deepseek")
 50 | 
 51 |     # Relationship to thoughts
 52 |     thoughts = relationship(
 53 |         "ThoughtRecord", back_populates="session", cascade="all, delete-orphan"
 54 |     )
 55 | 
 56 |     # Index for performance
 57 |     __table_args__ = (
 58 |         Index("ix_session_created", "created_at"),
 59 |         Index("ix_session_updated", "updated_at"),
 60 |     )
 61 | 
 62 | 
 63 | class ThoughtRecord(Base):
 64 |     """Database model for thought storage."""
 65 | 
 66 |     __tablename__ = "thoughts"
 67 | 
 68 |     id = Column(Integer, primary_key=True)
 69 |     session_id = Column(String, ForeignKey("sessions.id"), nullable=False)
 70 |     thought_number = Column(Integer, nullable=False)
 71 |     thought = Column(Text, nullable=False)
 72 |     total_thoughts = Column(Integer, nullable=False)
 73 |     next_needed = Column(Boolean, nullable=False)
 74 |     branch_from = Column(Integer, nullable=True)
 75 |     branch_id = Column(String, nullable=True)
 76 | 
 77 |     # Processing metadata
 78 |     processing_strategy = Column(String, nullable=True)
 79 |     complexity_score = Column(Float, nullable=True)
 80 |     estimated_cost = Column(Float, nullable=True)
 81 |     actual_cost = Column(Float, nullable=True)
 82 |     token_usage = Column(Integer, nullable=True)
 83 |     processing_time = Column(Float, nullable=True)
 84 | 
 85 |     # Timestamps
 86 |     created_at = Column(DateTime, default=datetime.utcnow)
 87 |     processed_at = Column(DateTime, nullable=True)
 88 | 
 89 |     # Response data
 90 |     response = Column(Text, nullable=True)
 91 |     specialist_used = Column(JSON, nullable=True)  # List of specialists used
 92 | 
 93 |     # Relationship
 94 |     session = relationship("SessionRecord", back_populates="thoughts")
 95 | 
 96 |     # Indexes for performance
 97 |     __table_args__ = (
 98 |         Index("ix_thought_session", "session_id"),
 99 |         Index("ix_thought_number", "thought_number"),
100 |         Index("ix_thought_created", "created_at"),
101 |         Index("ix_thought_branch", "branch_from", "branch_id"),
102 |     )
103 | 
104 |     def to_thought_data(self) -> ThoughtData:
105 |         """Convert database record to ThoughtData model."""
106 |         return ThoughtData(
107 |             thought=str(self.thought),
108 |             thoughtNumber=int(self.thought_number),
109 |             totalThoughts=int(self.total_thoughts),
110 |             nextThoughtNeeded=bool(self.next_needed),
111 |             isRevision=False,  # Assuming default value is intentional
112 |             branchFromThought=self.branch_from,
113 |             branchId=self.branch_id,
114 |             needsMoreThoughts=True,  # Assuming default value is intentional
115 |         )
116 | 
117 | 
118 | class BranchRecord(Base):
119 |     """Database model for branch tracking."""
120 | 
121 |     __tablename__ = "branches"
122 | 
123 |     id = Column(Integer, primary_key=True)
124 |     session_id = Column(String, ForeignKey("sessions.id"), nullable=False)
125 |     branch_id = Column(String, nullable=False)
126 |     parent_thought = Column(Integer, nullable=True)
127 |     created_at = Column(DateTime, default=datetime.utcnow)
128 |     thought_count = Column(Integer, default=0)
129 | 
130 |     __table_args__ = (
131 |         Index("ix_branch_session", "session_id"),
132 |         Index("ix_branch_id", "branch_id"),
133 |     )
134 | 
135 | 
136 | class UsageMetrics(Base):
137 |     """Database model for usage tracking and cost optimization."""
138 | 
139 |     __tablename__ = "usage_metrics"
140 | 
141 |     id = Column(Integer, primary_key=True)
142 |     date = Column(DateTime, default=datetime.utcnow)
143 |     provider = Column(String, nullable=False)
144 |     processing_strategy = Column(String, nullable=False)
145 |     complexity_level = Column(String, nullable=False)
146 | 
147 |     # Metrics
148 |     thought_count = Column(Integer, default=0)
149 |     total_tokens = Column(Integer, default=0)
150 |     total_cost = Column(Float, default=0.0)
151 |     avg_processing_time = Column(Float, default=0.0)
152 |     success_rate = Column(Float, default=1.0)
153 | 
154 |     # Performance tracking
155 |     token_efficiency = Column(Float, default=0.0)  # quality/token ratio
156 |     cost_effectiveness = Column(Float, default=0.0)  # quality/cost ratio
157 | 
158 |     __table_args__ = (
159 |         Index("ix_metrics_date", "date"),
160 |         Index("ix_metrics_provider", "provider"),
161 |         Index("ix_metrics_strategy", "processing_strategy"),
162 |     )
163 | 
164 | 
165 | class PersistentMemoryManager:
166 |     """Manages persistent storage and memory pruning."""
167 | 
168 |     def __init__(self, database_url: str | None = None) -> None:
169 |         """Initialize persistent memory manager."""
170 |         # Default to local SQLite database
171 |         if database_url is None:
172 |             db_dir = Path.home() / ".sequential_thinking" / "data"
173 |             db_dir.mkdir(parents=True, exist_ok=True)
174 |             database_url = f"sqlite:///{db_dir}/memory.db"
175 | 
176 |         # Configure engine with connection pooling
177 |         if database_url.startswith("sqlite"):
178 |             # SQLite-specific configuration
179 |             self.engine = create_engine(
180 |                 database_url,
181 |                 poolclass=StaticPool,
182 |                 connect_args={"check_same_thread": False},
183 |                 echo=False,
184 |             )
185 |         else:
186 |             # PostgreSQL/other database configuration
187 |             self.engine = create_engine(
188 |                 database_url,
189 |                 pool_pre_ping=True,
190 |                 pool_size=DatabaseConstants.CONNECTION_POOL_SIZE,
191 |                 max_overflow=DatabaseConstants.CONNECTION_POOL_OVERFLOW,
192 |             )
193 | 
194 |         # Create tables
195 |         Base.metadata.create_all(self.engine)
196 | 
197 |         # Session factory
198 |         self.SessionLocal = sessionmaker(bind=self.engine)
199 | 
200 |         logger.info(f"Persistent memory initialized with database: {database_url}")
201 | 
202 |     def create_session(
203 |         self, session_id: str, provider: str = DefaultSettings.DEFAULT_PROVIDER
204 |     ) -> None:
205 |         """Create a new session record."""
206 |         with self.SessionLocal() as db:
207 |             existing = (
208 |                 db.query(SessionRecord).filter(SessionRecord.id == session_id).first()
209 |             )
210 | 
211 |             if not existing:
212 |                 session_record = SessionRecord(id=session_id, provider=provider)
213 |                 db.add(session_record)
214 |                 db.commit()
215 |                 logger.info(f"Created new session: {session_id}")
216 | 
217 |     def store_thought(
218 |         self,
219 |         session_id: str,
220 |         thought_data: ThoughtData,
221 |         response: str | None = None,
222 |         processing_metadata: dict | None = None,
223 |     ) -> int:
224 |         """Store a thought and return its database ID."""
225 |         with self.SessionLocal() as db:
226 |             session_record = self._ensure_session_exists(db, session_id)
227 |             thought_record = self._create_thought_record(
228 |                 session_id, thought_data, response, processing_metadata
229 |             )
230 | 
231 |             db.add(thought_record)
232 |             self._update_session_stats(session_record, processing_metadata)
233 |             self._handle_branching(db, session_id, thought_data)
234 | 
235 |             db.commit()
236 |             return int(thought_record.id)
237 | 
238 |     def _ensure_session_exists(self, db: Session, session_id: str) -> SessionRecord:
239 |         """Ensure session exists in database and return it."""
240 |         session_record = (
241 |             db.query(SessionRecord).filter(SessionRecord.id == session_id).first()
242 |         )
243 | 
244 |         if not session_record:
245 |             self.create_session(session_id)
246 |             session_record = (
247 |                 db.query(SessionRecord).filter(SessionRecord.id == session_id).first()
248 |             )
249 | 
250 |         return session_record
251 | 
252 |     def _create_thought_record(
253 |         self,
254 |         session_id: str,
255 |         thought_data: ThoughtData,
256 |         response: str | None,
257 |         processing_metadata: dict | None,
258 |     ) -> ThoughtRecord:
259 |         """Create a thought record with metadata."""
260 |         thought_record = ThoughtRecord(
261 |             session_id=session_id,
262 |             thought_number=thought_data.thoughtNumber,
263 |             thought=thought_data.thought,
264 |             total_thoughts=thought_data.totalThoughts,
265 |             next_needed=thought_data.nextThoughtNeeded,
266 |             branch_from=thought_data.branchFromThought,
267 |             branch_id=thought_data.branchId,
268 |             response=response,
269 |         )
270 | 
271 |         if processing_metadata:
272 |             self._apply_processing_metadata(thought_record, processing_metadata)
273 | 
274 |         return thought_record
275 | 
276 |     def _apply_processing_metadata(
277 |         self, thought_record: ThoughtRecord, metadata: dict
278 |     ) -> None:
279 |         """Apply processing metadata to thought record."""
280 |         if strategy := metadata.get("strategy"):
281 |             thought_record.processing_strategy = str(strategy)  # type: ignore[assignment]
282 |         if complexity_score := metadata.get("complexity_score"):
283 |             thought_record.complexity_score = float(complexity_score)  # type: ignore[assignment]
284 |         if estimated_cost := metadata.get("estimated_cost"):
285 |             thought_record.estimated_cost = float(estimated_cost)  # type: ignore[assignment]
286 |         if actual_cost := metadata.get("actual_cost"):
287 |             thought_record.actual_cost = float(actual_cost)  # type: ignore[assignment]
288 |         if token_usage := metadata.get("token_usage"):
289 |             thought_record.token_usage = int(token_usage)  # type: ignore[assignment]
290 |         if processing_time := metadata.get("processing_time"):
291 |             thought_record.processing_time = float(processing_time)  # type: ignore[assignment]
292 | 
293 |         thought_record.specialist_used = metadata.get("specialists", [])
294 |         thought_record.processed_at = datetime.utcnow()  # type: ignore[assignment]
295 | 
296 |     def _update_session_stats(
297 |         self, session_record: SessionRecord, processing_metadata: dict | None
298 |     ) -> None:
299 |         """Update session statistics."""
300 |         if session_record:
301 |             session_record.totalThoughts += 1  # type: ignore[assignment]
302 |             session_record.updated_at = datetime.utcnow()  # type: ignore[assignment]
303 |             if processing_metadata and processing_metadata.get("actual_cost"):
304 |                 session_record.total_cost += float(processing_metadata["actual_cost"])  # type: ignore[assignment]
305 | 
306 |     def _handle_branching(
307 |         self, db: Session, session_id: str, thought_data: ThoughtData
308 |     ) -> None:
309 |         """Handle branch record creation and updates."""
310 |         if not thought_data.branchId:
311 |             return
312 | 
313 |         branch_record = (
314 |             db.query(BranchRecord)
315 |             .filter(
316 |                 BranchRecord.session_id == session_id,
317 |                 BranchRecord.branchId == thought_data.branchId,
318 |             )
319 |             .first()
320 |         )
321 | 
322 |         if not branch_record:
323 |             branch_record = BranchRecord(
324 |                 session_id=session_id,
325 |                 branch_id=thought_data.branchId,
326 |                 parent_thought=thought_data.branchFromThought,
327 |             )
328 |             db.add(branch_record)
329 | 
330 |         branch_record.thought_count += 1  # type: ignore[assignment]
331 | 
332 |     def get_session_thoughts(
333 |         self, session_id: str, limit: int | None = None
334 |     ) -> list[ThoughtRecord]:
335 |         """Retrieve thoughts for a session."""
336 |         with self.SessionLocal() as db:
337 |             query = (
338 |                 db.query(ThoughtRecord)
339 |                 .filter(ThoughtRecord.session_id == session_id)
340 |                 .order_by(ThoughtRecord.thoughtNumber)
341 |             )
342 | 
343 |             if limit:
344 |                 query = query.limit(limit)
345 | 
346 |             return query.all()
347 | 
348 |     def get_thought_by_number(
349 |         self, session_id: str, thought_number: int
350 |     ) -> ThoughtRecord | None:
351 |         """Get a specific thought by number."""
352 |         with self.SessionLocal() as db:
353 |             return (
354 |                 db.query(ThoughtRecord)
355 |                 .filter(
356 |                     ThoughtRecord.session_id == session_id,
357 |                     ThoughtRecord.thoughtNumber == thought_number,
358 |                 )
359 |                 .first()
360 |             )
361 | 
362 |     def get_branch_thoughts(
363 |         self, session_id: str, branch_id: str
364 |     ) -> list[ThoughtRecord]:
365 |         """Get all thoughts in a specific branch."""
366 |         with self.SessionLocal() as db:
367 |             return (
368 |                 db.query(ThoughtRecord)
369 |                 .filter(
370 |                     ThoughtRecord.session_id == session_id,
371 |                     ThoughtRecord.branchId == branch_id,
372 |                 )
373 |                 .order_by(ThoughtRecord.thoughtNumber)
374 |                 .all()
375 |             )
376 | 
377 |     def prune_old_sessions(
378 |         self, older_than_days: int = 30, keep_recent: int = 100
379 |     ) -> int:
380 |         """Prune old sessions to manage storage space."""
381 |         cutoff_date = datetime.utcnow() - timedelta(days=older_than_days)
382 | 
383 |         with self.SessionLocal() as db:
384 |             # Get sessions older than cutoff, excluding most recent ones
385 |             old_sessions = (
386 |                 db.query(SessionRecord)
387 |                 .filter(SessionRecord.updated_at < cutoff_date)
388 |                 .order_by(desc(SessionRecord.updated_at))
389 |                 .offset(keep_recent)
390 |             )
391 | 
392 |             deleted_count = 0
393 |             for session in old_sessions:
394 |                 db.delete(session)  # Cascade will handle thoughts and branches
395 |                 deleted_count += 1
396 | 
397 |             if deleted_count > 0:
398 |                 db.commit()
399 |                 logger.info(f"Pruned {deleted_count} old sessions")
400 | 
401 |             return deleted_count
402 | 
403 |     def get_usage_stats(self, days_back: int = 7) -> dict:
404 |         """Get usage statistics for cost optimization."""
405 |         cutoff_date = datetime.utcnow() - timedelta(days=days_back)
406 | 
407 |         with self.SessionLocal() as db:
408 |             # Session stats
409 |             session_count = (
410 |                 db.query(SessionRecord)
411 |                 .filter(SessionRecord.created_at >= cutoff_date)
412 |                 .count()
413 |             )
414 | 
415 |             # Thought stats
416 |             thought_stats = (
417 |                 db.query(ThoughtRecord)
418 |                 .filter(ThoughtRecord.created_at >= cutoff_date)
419 |                 .all()
420 |             )
421 | 
422 |             total_thoughts = len(thought_stats)
423 |             # Explicit type casting to resolve SQLAlchemy Column type issues
424 |             total_cost: float = float(
425 |                 sum(float(t.actual_cost or 0) for t in thought_stats)
426 |             )
427 |             total_tokens: int = int(sum(int(t.token_usage or 0) for t in thought_stats))
428 |             avg_processing_time = sum(
429 |                 t.processing_time or 0 for t in thought_stats
430 |             ) / max(total_thoughts, 1)
431 | 
432 |             # Strategy breakdown
433 |             strategy_stats = {}
434 |             for thought in thought_stats:
435 |                 strategy = thought.processing_strategy or "unknown"
436 |                 if strategy not in strategy_stats:
437 |                     strategy_stats[strategy] = {"count": 0, "cost": 0, "tokens": 0}
438 |                 strategy_stats[strategy]["count"] += 1
439 |                 strategy_stats[strategy]["cost"] += thought.actual_cost or 0
440 |                 strategy_stats[strategy]["tokens"] += thought.token_usage or 0
441 | 
442 |             return {
443 |                 "period_days": days_back,
444 |                 "session_count": session_count,
445 |                 "total_thoughts": total_thoughts,
446 |                 "total_cost": total_cost,
447 |                 "total_tokens": total_tokens,
448 |                 "avg_cost_per_thought": total_cost / max(total_thoughts, 1),
449 |                 "avg_tokens_per_thought": total_tokens / max(total_thoughts, 1),
450 |                 "avg_processing_time": avg_processing_time,
451 |                 "strategy_breakdown": strategy_stats,
452 |             }
453 | 
454 |     def record_usage_metrics(
455 |         self,
456 |         provider: str,
457 |         processing_strategy: str,
458 |         complexity_level: str,
459 |         thought_count: int = 1,
460 |         tokens: int = 0,
461 |         cost: float = 0.0,
462 |         processing_time: float = 0.0,
463 |         success: bool = True,
464 |     ) -> None:
465 |         """Record usage metrics for cost optimization."""
466 |         with self.SessionLocal() as db:
467 |             # Check if we have a record for today
468 |             today = datetime.utcnow().date()
469 |             existing = (
470 |                 db.query(UsageMetrics)
471 |                 .filter(
472 |                     UsageMetrics.date >= datetime.combine(today, datetime.min.time()),
473 |                     UsageMetrics.provider == provider,
474 |                     UsageMetrics.processing_strategy == processing_strategy,
475 |                     UsageMetrics.complexity_level == complexity_level,
476 |                 )
477 |                 .first()
478 |             )
479 | 
480 |             if existing:
481 |                 # Update existing record
482 |                 existing.thought_count += thought_count  # type: ignore[assignment]
483 |                 existing.total_tokens += tokens  # type: ignore[assignment]
484 |                 existing.total_cost += cost  # type: ignore[assignment]
485 |                 existing.avg_processing_time = (  # type: ignore[assignment]
486 |                     float(existing.avg_processing_time)
487 |                     * (existing.thought_count - thought_count)
488 |                     + processing_time * thought_count
489 |                 ) / existing.thought_count
490 |                 if not success:
491 |                     existing.success_rate = (  # type: ignore[assignment]
492 |                         float(existing.success_rate)
493 |                         * (existing.thought_count - thought_count)
494 |                     ) / existing.thought_count
495 |             else:
496 |                 # Create new record
497 |                 metrics = UsageMetrics(
498 |                     provider=provider,
499 |                     processing_strategy=processing_strategy,
500 |                     complexity_level=complexity_level,
501 |                     thought_count=thought_count,
502 |                     total_tokens=tokens,
503 |                     total_cost=cost,
504 |                     avg_processing_time=processing_time,
505 |                     success_rate=1.0 if success else 0.0,
506 |                 )
507 |                 db.add(metrics)
508 | 
509 |             db.commit()
510 | 
511 |     def optimize_database(self) -> None:
512 |         """Run database optimization tasks."""
513 |         with self.SessionLocal() as db:
514 |             from sqlalchemy import text
515 | 
516 |             if self.engine.dialect.name == "sqlite":
517 |                 db.execute(text("VACUUM"))
518 |                 db.execute(text("ANALYZE"))
519 |             else:
520 |                 db.execute(text("ANALYZE"))
521 |             db.commit()
522 |             logger.info("Database optimization completed")
523 | 
524 |     def close(self) -> None:
525 |         """Close database connections."""
526 |         self.engine.dispose()
527 | 
528 | 
529 | # Convenience functions
530 | def create_persistent_memory(
531 |     database_url: str | None = None,
532 | ) -> PersistentMemoryManager:
533 |     """Create a persistent memory manager instance."""
534 |     return PersistentMemoryManager(database_url)
535 | 
536 | 
537 | def get_database_url_from_env() -> str:
538 |     """Get database URL from environment variables."""
539 |     if url := os.getenv("DATABASE_URL"):
540 |         return url
541 | 
542 |     # Default to local SQLite
543 |     db_dir = Path.home() / ".sequential_thinking" / "data"
544 |     db_dir.mkdir(parents=True, exist_ok=True)
545 |     return f"sqlite:///{db_dir}/memory.db"
546 | 
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/processors/multi_thinking_core.py:
--------------------------------------------------------------------------------

```python
  1 | """Multi-Thinking Core Agent Architecture.
  2 | 
  3 | Multi-dimensional thinking direction Agent core implementation.
  4 | Strictly follows "focused thinking direction" principles, supporting intelligent sequences from single to multiple thinking modes.
  5 | """
  6 | 
  7 | # Lazy import to break circular dependency
  8 | import logging
  9 | import os
 10 | from abc import abstractmethod
 11 | from dataclasses import dataclass
 12 | from enum import Enum
 13 | from typing import Any
 14 | 
 15 | from agno.agent import Agent
 16 | from agno.models.base import Model
 17 | from agno.tools.reasoning import ReasoningTools
 18 | 
 19 | # Try to import ExaTools, gracefully handle if not available
 20 | try:
 21 |     from agno.tools.exa import ExaTools
 22 | 
 23 |     EXA_AVAILABLE = bool(os.environ.get("EXA_API_KEY"))
 24 | except ImportError:
 25 |     ExaTools = None
 26 |     EXA_AVAILABLE = False
 27 | 
 28 | logger = logging.getLogger(__name__)
 29 | 
 30 | 
 31 | class ThinkingDirection(Enum):
 32 |     """Multi-thinking direction enumeration."""
 33 | 
 34 |     FACTUAL = "factual"  # Facts and data
 35 |     EMOTIONAL = "emotional"  # Emotions and intuition
 36 |     CRITICAL = "critical"  # Criticism and risk
 37 |     OPTIMISTIC = "optimistic"  # Optimism and value
 38 |     CREATIVE = "creative"  # Creativity and innovation
 39 |     SYNTHESIS = "synthesis"  # Metacognition and integration
 40 | 
 41 | 
 42 | class ProcessingDepth(Enum):
 43 |     """Processing complexity levels."""
 44 | 
 45 |     SINGLE = "single"  # Single thinking mode
 46 |     DOUBLE = "double"  # Double thinking sequence
 47 |     TRIPLE = "triple"  # Triple thinking sequence
 48 |     FULL = "full"  # Complete multi-thinking
 49 | 
 50 | 
 51 | @dataclass(frozen=True)
 52 | class ThinkingTimingConfig:
 53 |     """Thinking direction timing configuration."""
 54 | 
 55 |     direction: ThinkingDirection
 56 |     default_time_seconds: int
 57 |     min_time_seconds: int
 58 |     max_time_seconds: int
 59 |     is_quick_reaction: bool = (
 60 |         False  # Whether it's a quick reaction mode (like emotional thinking)
 61 |     )
 62 | 
 63 | 
 64 | # Timing configuration constants
 65 | THINKING_TIMING_CONFIGS = {
 66 |     ThinkingDirection.FACTUAL: ThinkingTimingConfig(
 67 |         ThinkingDirection.FACTUAL, 120, 60, 300, False
 68 |     ),
 69 |     ThinkingDirection.EMOTIONAL: ThinkingTimingConfig(
 70 |         ThinkingDirection.EMOTIONAL, 30, 15, 60, True
 71 |     ),  # Quick intuition
 72 |     ThinkingDirection.CRITICAL: ThinkingTimingConfig(
 73 |         ThinkingDirection.CRITICAL, 120, 60, 240, False
 74 |     ),
 75 |     ThinkingDirection.OPTIMISTIC: ThinkingTimingConfig(
 76 |         ThinkingDirection.OPTIMISTIC, 120, 60, 240, False
 77 |     ),
 78 |     ThinkingDirection.CREATIVE: ThinkingTimingConfig(
 79 |         ThinkingDirection.CREATIVE, 240, 120, 360, False
 80 |     ),  # Creativity requires more time
 81 |     ThinkingDirection.SYNTHESIS: ThinkingTimingConfig(
 82 |         ThinkingDirection.SYNTHESIS, 60, 30, 120, False
 83 |     ),
 84 | }
 85 | 
 86 | 
 87 | @dataclass(frozen=True)
 88 | class ThinkingCapability:
 89 |     """Multi-thinking capability definition."""
 90 | 
 91 |     thinking_direction: ThinkingDirection
 92 |     role: str
 93 |     description: str
 94 |     role_description: str
 95 | 
 96 |     # Cognitive characteristics
 97 |     thinking_mode: str
 98 |     cognitive_focus: str
 99 |     output_style: str
100 | 
101 |     # Time management
102 |     timing_config: ThinkingTimingConfig
103 | 
104 |     # Enhanced features
105 |     tools: list[Any] | None = None
106 |     reasoning_level: int = 1
107 |     memory_enabled: bool = False
108 | 
109 |     def __post_init__(self):
110 |         if self.tools is None:
111 |             # Default tools for all thinking directions
112 |             tools = [ReasoningTools]
113 | 
114 |             # Add ExaTools for all thinking directions except SYNTHESIS
115 |             if (
116 |                 EXA_AVAILABLE
117 |                 and ExaTools is not None
118 |                 and self.thinking_direction != ThinkingDirection.SYNTHESIS
119 |             ):
120 |                 tools.append(ExaTools)
121 | 
122 |             object.__setattr__(self, "tools", tools)
123 | 
124 |     def get_instructions(
125 |         self, context: str = "", previous_results: dict | None = None
126 |     ) -> list[str]:
127 |         """Generate specific analysis mode instructions."""
128 |         base_instructions = [
129 |             f"You are operating in {self.thinking_mode} analysis mode.",
130 |             f"Role: {self.role}",
131 |             f"Cognitive Focus: {self.cognitive_focus}",
132 |             "",
133 |             "CORE PRINCIPLES:",
134 |             f"1. Apply ONLY {self.thinking_mode} approach - maintain strict focus",
135 |             f"2. Time allocation: {self.timing_config.default_time_seconds} seconds for thorough analysis",
136 |             f"3. Output approach: {self.output_style}",
137 |             "",
138 |             f"Your specific responsibility: {self.role_description}",
139 |         ]
140 | 
141 |         # Add specific analysis mode detailed instructions
142 |         specific_instructions = self._get_specific_instructions()
143 |         base_instructions.extend(specific_instructions)
144 | 
145 |         # Add context and previous results
146 |         if context:
147 |             base_instructions.extend(
148 |                 [
149 |                     "",
150 |                     f"Context: {context}",
151 |                 ]
152 |             )
153 | 
154 |         if previous_results:
155 |             base_instructions.extend(
156 |                 [
157 |                     "",
158 |                     "Previous analysis insights from other perspectives:",
159 |                     *[
160 |                         f"  {self._format_previous_result_label(direction_name)}: {result[:100]}..."
161 |                         for direction_name, result in previous_results.items()
162 |                     ],
163 |                 ]
164 |             )
165 | 
166 |         return base_instructions
167 | 
168 |     def _format_previous_result_label(self, direction_name: str) -> str:
169 |         """Format previous result labels, using thinking direction concepts."""
170 |         label_mapping = {
171 |             "factual": "Factual analysis",
172 |             "emotional": "Emotional perspective",
173 |             "critical": "Critical evaluation",
174 |             "optimistic": "Optimistic view",
175 |             "creative": "Creative thinking",
176 |             "synthesis": "Strategic synthesis",
177 |         }
178 |         return label_mapping.get(direction_name.lower(), "Analysis")
179 | 
180 |     @abstractmethod
181 |     def _get_specific_instructions(self) -> list[str]:
182 |         """Get specific thinking direction detailed instructions."""
183 | 
184 | 
185 | class FactualThinkingCapability(ThinkingCapability):
186 |     """Factual thinking capability: facts and data."""
187 | 
188 |     def __init__(self) -> None:
189 |         super().__init__(
190 |             thinking_direction=ThinkingDirection.FACTUAL,
191 |             role="Factual Information Processor",
192 |             description="Collect and process objective facts and data",
193 |             role_description="I focus only on objective facts and data. I provide neutral information without personal interpretation.",
194 |             thinking_mode="analytical_factual",
195 |             cognitive_focus="Pure information processing, zero emotion or judgment",
196 |             output_style="Objective fact lists, data-driven information",
197 |             timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.FACTUAL],
198 |             reasoning_level=2,
199 |             memory_enabled=False,
200 |         )
201 | 
202 |     def _get_specific_instructions(self) -> list[str]:
203 |         instructions = [
204 |             "",
205 |             "FACTUAL ANALYSIS GUIDELINES:",
206 |             "• Use simple statements to present facts: 'The data shows...', 'Known information is...'",
207 |             "• Avoid technical jargon, explain data in everyday language",
208 |             "• Present only verified facts and objective data",
209 |             "• Avoid opinions, interpretations, or emotional reactions",
210 |             "• Identify what information is missing and needed",
211 |             "• Separate facts from assumptions clearly",
212 |         ]
213 | 
214 |         # Add research capabilities if ExaTools is available
215 |         if EXA_AVAILABLE and ExaTools is not None:
216 |             instructions.extend(
217 |                 [
218 |                     "",
219 |                     "RESEARCH CAPABILITIES:",
220 |                     "• Use search_exa() to find current facts and data when needed",
221 |                     "• Search for recent information, statistics, or verified data",
222 |                     "• Cite sources when presenting factual information",
223 |                     "• Prioritize authoritative sources and recent data",
224 |                 ]
225 |             )
226 | 
227 |         instructions.extend(
228 |             [
229 |                 "",
230 |                 "FORBIDDEN in factual analysis mode:",
231 |                 "- Personal opinions or judgments",
232 |                 "- Emotional responses or gut feelings",
233 |                 "- Speculation or 'what if' scenarios",
234 |                 "- Value judgments (good/bad, right/wrong)",
235 |             ]
236 |         )
237 | 
238 |         return instructions
239 | 
240 | 
241 | class EmotionalThinkingCapability(ThinkingCapability):
242 |     """Emotional thinking capability: emotions and intuition."""
243 | 
244 |     def __init__(self) -> None:
245 |         super().__init__(
246 |             thinking_direction=ThinkingDirection.EMOTIONAL,
247 |             role="Intuitive Emotional Processor",
248 |             description="Provide emotional responses and intuitive insights",
249 |             role_description="I express intuition and emotional reactions. No reasoning needed, just share feelings.",
250 |             thinking_mode="intuitive_emotional",
251 |             cognitive_focus="Emotional intelligence and intuitive processing",
252 |             output_style="Intuitive reactions, emotional expression, humanized perspective",
253 |             timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.EMOTIONAL],
254 |             reasoning_level=1,  # Lowest rationality, highest intuition
255 |             memory_enabled=False,
256 |         )
257 | 
258 |     def _get_specific_instructions(self) -> list[str]:
259 |         return [
260 |             "",
261 |             "EMOTIONAL INTUITIVE GUIDELINES:",
262 |             "• Start responses with 'I feel...', 'My intuition tells me...', 'My gut reaction is...'",
263 |             "• Keep expressions brief and powerful - 30-second emotional snapshots",
264 |             "• Express immediate gut reactions and feelings",
265 |             "• Share intuitive hunches without justification",
266 |             "• Include visceral, immediate responses",
267 |             "• NO need to explain or justify feelings",
268 |             "",
269 |             "ENCOURAGED in emotional intuitive mode:",
270 |             "- First impressions and gut reactions",
271 |             "- Emotional responses to ideas or situations",
272 |             "- Intuitive concerns or excitement",
273 |             "- 'Sixth sense' about what might work",
274 |             "",
275 |             "Remember: This is a 30-second emotional snapshot, not analysis!",
276 |         ]
277 | 
278 | 
279 | class CriticalThinkingCapability(ThinkingCapability):
280 |     """Critical thinking capability: criticism and risk."""
281 | 
282 |     def __init__(self) -> None:
283 |         super().__init__(
284 |             thinking_direction=ThinkingDirection.CRITICAL,
285 |             role="Critical Risk Assessor",
286 |             description="Critical analysis and risk identification",
287 |             role_description="I identify risks and problems. Critical but not pessimistic, I point out real difficulties.",
288 |             thinking_mode="critical_analytical",
289 |             cognitive_focus="Critical thinking and risk assessment",
290 |             output_style="Sharp questioning, risk warnings, logical verification",
291 |             timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.CRITICAL],
292 |             reasoning_level=3,  # High logical reasoning
293 |             memory_enabled=False,
294 |         )
295 | 
296 |     def _get_specific_instructions(self) -> list[str]:
297 |         instructions = [
298 |             "",
299 |             "CRITICAL ASSESSMENT GUIDELINES:",
300 |             "• Point out specific possible problems, not general pessimism",
301 |             "• Use phrases like 'The risk is...', 'This could fail because...', 'A problem might be...'",
302 |             "• Identify potential problems, risks, and weaknesses",
303 |             "• Challenge assumptions and look for logical flaws",
304 |             "• Consider worst-case scenarios and failure modes",
305 |             "• Provide logical reasons for all concerns raised",
306 |         ]
307 | 
308 |         # Add research capabilities if ExaTools is available
309 |         if EXA_AVAILABLE and ExaTools is not None:
310 |             instructions.extend(
311 |                 [
312 |                     "",
313 |                     "RESEARCH FOR CRITICAL ANALYSIS:",
314 |                     "• Search for counterexamples, failed cases, or criticism of similar ideas",
315 |                     "• Look for expert opinions that identify risks or problems",
316 |                     "• Find case studies of failures in similar contexts",
317 |                     "• Research regulatory or compliance issues that might apply",
318 |                 ]
319 |             )
320 | 
321 |         instructions.extend(
322 |             [
323 |                 "",
324 |                 "KEY AREAS TO EXAMINE:",
325 |                 "- Logical inconsistencies in arguments",
326 |                 "- Practical obstacles and implementation challenges",
327 |                 "- Resource constraints and limitations",
328 |                 "- Potential negative consequences",
329 |                 "- Missing information or unproven assumptions",
330 |                 "",
331 |                 "Note: Be critical but constructive - identify real problems, not just pessimism.",
332 |             ]
333 |         )
334 | 
335 |         return instructions
336 | 
337 | 
338 | class OptimisticThinkingCapability(ThinkingCapability):
339 |     """Optimistic thinking capability: optimism and value."""
340 | 
341 |     def __init__(self) -> None:
342 |         super().__init__(
343 |             thinking_direction=ThinkingDirection.OPTIMISTIC,
344 |             role="Optimistic Value Explorer",
345 |             description="Positive thinking and value discovery",
346 |             role_description="I find value and opportunities. Realistic optimism, I discover genuine benefits.",
347 |             thinking_mode="optimistic_constructive",
348 |             cognitive_focus="Positive psychology and opportunity identification",
349 |             output_style="Positive exploration, value discovery, opportunity identification",
350 |             timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.OPTIMISTIC],
351 |             reasoning_level=2,
352 |             memory_enabled=False,
353 |         )
354 | 
355 |     def _get_specific_instructions(self) -> list[str]:
356 |         instructions = [
357 |             "",
358 |             "OPTIMISTIC VALUE EXPLORATION GUIDELINES:",
359 |             "• Point out specific feasible benefits, not empty praise",
360 |             "• Use phrases like 'The benefit is...', 'This creates... value', 'An opportunity here is...'",
361 |             "• Focus on benefits, values, and positive outcomes",
362 |             "• Explore best-case scenarios and opportunities",
363 |             "• Identify feasible positive possibilities",
364 |             "• Provide logical reasons for optimism",
365 |         ]
366 | 
367 |         # Add research capabilities if ExaTools is available
368 |         if EXA_AVAILABLE and ExaTools is not None:
369 |             instructions.extend(
370 |                 [
371 |                     "",
372 |                     "RESEARCH FOR OPTIMISTIC ANALYSIS:",
373 |                     "• Search for success stories and positive case studies",
374 |                     "• Look for evidence of benefits in similar situations",
375 |                     "• Find research supporting potential positive outcomes",
376 |                     "• Research market opportunities and growth potential",
377 |                 ]
378 |             )
379 | 
380 |         instructions.extend(
381 |             [
382 |                 "",
383 |                 "KEY AREAS TO EXPLORE:",
384 |                 "- Benefits and positive outcomes",
385 |                 "- Opportunities for growth or improvement",
386 |                 "- Feasible best-case scenarios",
387 |                 "- Value creation possibilities",
388 |                 "- Strengths and positive aspects",
389 |                 "- Why this could work well",
390 |                 "",
391 |                 "Note: Be realistically optimistic - find genuine value, not false hope.",
392 |             ]
393 |         )
394 | 
395 |         return instructions
396 | 
397 | 
398 | class CreativeThinkingCapability(ThinkingCapability):
399 |     """Creative thinking capability: creativity and innovation."""
400 | 
401 |     def __init__(self) -> None:
402 |         super().__init__(
403 |             thinking_direction=ThinkingDirection.CREATIVE,
404 |             role="Creative Innovation Generator",
405 |             description="Creative thinking and innovative solutions",
406 |             role_description="I generate new ideas and alternative approaches. I break conventional limits and explore possibilities.",
407 |             thinking_mode="creative_generative",
408 |             cognitive_focus="Divergent thinking and creativity",
409 |             output_style="Novel ideas, innovative solutions, alternative thinking",
410 |             timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.CREATIVE],
411 |             reasoning_level=2,
412 |             memory_enabled=True,  # Creativity may require memory combination
413 |         )
414 | 
415 |     def _get_specific_instructions(self) -> list[str]:
416 |         instructions = [
417 |             "",
418 |             "CREATIVE INNOVATION GUIDELINES:",
419 |             "• Provide 3-5 specific creative ideas that could work",
420 |             "• Use phrases like 'What if...', 'Another approach could be...', 'An alternative is...'",
421 |             "• Generate new ideas, alternatives, and creative solutions",
422 |             "• Think laterally - explore unconventional approaches",
423 |             "• Break normal thinking patterns and assumptions",
424 |             "• Suggest modifications, improvements, or entirely new approaches",
425 |         ]
426 | 
427 |         # Add research capabilities if ExaTools is available
428 |         if EXA_AVAILABLE and ExaTools is not None:
429 |             instructions.extend(
430 |                 [
431 |                     "",
432 |                     "RESEARCH FOR CREATIVE INSPIRATION:",
433 |                     "• Search for innovative solutions in different industries",
434 |                     "• Look for creative approaches used in unrelated fields",
435 |                     "• Find emerging trends and new methodologies",
436 |                     "• Research breakthrough innovations and creative disruptions",
437 |                 ]
438 |             )
439 | 
440 |         instructions.extend(
441 |             [
442 |                 "",
443 |                 "CREATIVE TECHNIQUES TO USE:",
444 |                 "- Lateral thinking and analogies",
445 |                 "- Random word associations",
446 |                 "- 'What if' scenarios and thought experiments",
447 |                 "- Reversal thinking (what's the opposite?)",
448 |                 "- Combination of unrelated elements",
449 |                 "- Alternative perspectives and viewpoints",
450 |                 "",
451 |                 "Note: Quantity over quality - generate many ideas without judgment.",
452 |             ]
453 |         )
454 | 
455 |         return instructions
456 | 
457 | 
458 | class SynthesisThinkingCapability(ThinkingCapability):
459 |     """Synthesis thinking capability: metacognition and integration."""
460 | 
461 |     def __init__(self) -> None:
462 |         super().__init__(
463 |             thinking_direction=ThinkingDirection.SYNTHESIS,
464 |             role="Metacognitive Orchestrator",
465 |             description="Thinking process management and comprehensive coordination",
466 |             role_description="I integrate all perspectives and provide the final balanced answer. My output is what users see - it must be practical and human-friendly.",
467 |             thinking_mode="metacognitive_synthetic",
468 |             cognitive_focus="Metacognition and executive control",
469 |             output_style="Comprehensive integration, process management, unified conclusions",
470 |             timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.SYNTHESIS],
471 |             reasoning_level=3,  # Highest level of metacognition
472 |             memory_enabled=True,  # Need to remember all other thinking direction results
473 |         )
474 | 
475 |     def _get_specific_instructions(self) -> list[str]:
476 |         return [
477 |             "",
478 |             "STRATEGIC SYNTHESIS GUIDELINES:",
479 |             "• Your primary goal: Answer the original question using insights from other analyses",
480 |             "• Avoid generic rehashing - focus specifically on the question asked",
481 |             "• Use other analyses' contributions as evidence/perspectives to build your answer",
482 |             "• Provide practical, actionable insights users can understand",
483 |             "",
484 |             "CRITICAL QUESTION-FOCUSED APPROACH:",
485 |             "1. Extract insights from other analyses that directly address the question",
486 |             "2. Ignore generic statements - focus on question-relevant content",
487 |             "3. Build a coherent answer that uses multiple perspectives as support",
488 |             "4. End with a clear, direct response to what was originally asked",
489 |             "",
490 |             "KEY RESPONSIBILITIES:",
491 |             "- Return to the original question and answer it directly",
492 |             "- Use other analyses' insights as building blocks for your answer",
493 |             "- Synthesize perspectives into a cohesive response to the specific question",
494 |             "- Avoid academic summarization - focus on practical question-answering",
495 |             "- Ensure your entire response serves the original question",
496 |             "",
497 |             "FINAL OUTPUT REQUIREMENTS:",
498 |             "• This is the user's ONLY answer - it must directly address their question",
499 |             "• Don't just summarize - synthesize into a clear answer",
500 |             "• Remove content that doesn't directly relate to the original question",
501 |             "• For philosophical questions: provide thoughtful answers, not just analysis",
502 |             "• You may mention different analysis types, perspectives, or methodologies if relevant",
503 |             "• Present your synthesis process transparently, showing how different viewpoints contribute",
504 |         ]
505 | 
506 | 
507 | class MultiThinkingAgentFactory:
508 |     """Multi-thinking Agent factory."""
509 | 
510 |     # Thinking direction capability mapping
511 |     THINKING_CAPABILITIES = {
512 |         ThinkingDirection.FACTUAL: FactualThinkingCapability(),
513 |         ThinkingDirection.EMOTIONAL: EmotionalThinkingCapability(),
514 |         ThinkingDirection.CRITICAL: CriticalThinkingCapability(),
515 |         ThinkingDirection.OPTIMISTIC: OptimisticThinkingCapability(),
516 |         ThinkingDirection.CREATIVE: CreativeThinkingCapability(),
517 |         ThinkingDirection.SYNTHESIS: SynthesisThinkingCapability(),
518 |     }
519 | 
520 |     def __init__(self) -> None:
521 |         self._agent_cache: dict[str, Agent] = {}  # Cache for created agents
522 | 
523 |     def create_thinking_agent(
524 |         self,
525 |         thinking_direction: ThinkingDirection,
526 |         model: Model,
527 |         context: str = "",
528 |         previous_results: dict | None = None,
529 |         **kwargs,
530 |     ) -> Agent:
531 |         """Create a specific thinking direction Agent."""
532 |         capability = self.THINKING_CAPABILITIES[thinking_direction]
533 | 
534 |         # Generate cache key
535 |         cache_key = (
536 |             f"{thinking_direction.value}_{model.__class__.__name__}_{hash(context)}"
537 |         )
538 | 
539 |         if cache_key in self._agent_cache:
540 |             # Return cached agent but update instructions
541 |             agent = self._agent_cache[cache_key]
542 |             agent.instructions = capability.get_instructions(context, previous_results)
543 |             return agent
544 | 
545 |         # Create new agent
546 |         agent = Agent(
547 |             name=f"{thinking_direction.value.title()}AnalysisAgent",
548 |             role=capability.role,
549 |             description=capability.description,
550 |             model=model,
551 |             tools=capability.tools if capability.tools else None,
552 |             instructions=capability.get_instructions(context, previous_results),
553 |             markdown=True,
554 |             **kwargs,
555 |         )
556 | 
557 |         # Add special configuration
558 |         if capability.memory_enabled:
559 |             agent.enable_user_memories = True
560 | 
561 |         # Cache agent
562 |         self._agent_cache[cache_key] = agent
563 | 
564 |         logger.info(
565 |             f"Created {thinking_direction.value} thinking agent with {capability.timing_config.default_time_seconds}s time limit"
566 |         )
567 |         return agent
568 | 
569 |     def get_thinking_timing(
570 |         self, thinking_direction: ThinkingDirection
571 |     ) -> ThinkingTimingConfig:
572 |         """Get specific thinking direction timing configuration."""
573 |         return THINKING_TIMING_CONFIGS[thinking_direction]
574 | 
575 |     def get_all_thinking_directions(self) -> list[ThinkingDirection]:
576 |         """Get all available thinking directions."""
577 |         return list(self.THINKING_CAPABILITIES.keys())
578 | 
579 |     def clear_cache(self) -> None:
580 |         """Clear agent cache."""
581 |         self._agent_cache.clear()
582 |         logger.info("Thinking agent cache cleared")
583 | 
584 | 
585 | # Global factory instance
586 | _thinking_factory = MultiThinkingAgentFactory()
587 | 
588 | 
589 | # Convenience functions
590 | def create_thinking_agent(
591 |     thinking_direction: ThinkingDirection, model: Model, **kwargs
592 | ) -> Agent:
593 |     """Convenience function to create thinking Agent."""
594 |     return _thinking_factory.create_thinking_agent(thinking_direction, model, **kwargs)
595 | 
596 | 
597 | def get_thinking_timing(thinking_direction: ThinkingDirection) -> ThinkingTimingConfig:
598 |     """Convenience function to get thinking timing configuration."""
599 |     return _thinking_factory.get_thinking_timing(thinking_direction)
600 | 
601 | 
602 | def get_all_thinking_directions() -> list[ThinkingDirection]:
603 |     """Convenience function to get all thinking directions."""
604 |     return _thinking_factory.get_all_thinking_directions()
605 | 
```

--------------------------------------------------------------------------------
/src/mcp_server_mas_sequential_thinking/processors/multi_thinking_processor.py:
--------------------------------------------------------------------------------

```python
  1 | """Multi-Thinking Sequential Processor.
  2 | 
  3 | Implements a parallel processor based on multi-directional thinking methodology,
  4 | integrated with the Agno Workflow system. Supports intelligent parallel processing
  5 | from single direction to full multi-direction analysis.
  6 | """
  7 | 
  8 | # Lazy import to break circular dependency
  9 | import logging
 10 | import time
 11 | from dataclasses import dataclass
 12 | from typing import TYPE_CHECKING, Any
 13 | 
 14 | from agno.workflow.types import StepOutput
 15 | 
 16 | if TYPE_CHECKING:
 17 |     from mcp_server_mas_sequential_thinking.core.models import ThoughtData
 18 | 
 19 | logger = logging.getLogger(__name__)
 20 | from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config
 21 | from mcp_server_mas_sequential_thinking.routing.multi_thinking_router import (
 22 |     MultiThinkingIntelligentRouter,
 23 |     RoutingDecision,
 24 | )
 25 | 
 26 | from .multi_thinking_core import (
 27 |     MultiThinkingAgentFactory,
 28 |     ProcessingDepth,
 29 |     ThinkingDirection,
 30 | )
 31 | 
 32 | # logger already defined above
 33 | 
 34 | 
 35 | @dataclass
 36 | class MultiThinkingProcessingResult:
 37 |     """Multi-thinking processing result."""
 38 | 
 39 |     content: str
 40 |     strategy_used: str
 41 |     thinking_sequence: list[str]
 42 |     processing_time: float
 43 |     complexity_score: float
 44 |     cost_reduction: float
 45 |     individual_results: dict[str, str]  # Results from each thinking direction
 46 |     step_name: str
 47 | 
 48 | 
 49 | class MultiThinkingSequentialProcessor:
 50 |     """Multi-thinking parallel processor."""
 51 | 
 52 |     def __init__(self) -> None:
 53 |         self.model_config = get_model_config()
 54 |         self.thinking_factory = MultiThinkingAgentFactory()
 55 |         self.router = MultiThinkingIntelligentRouter()
 56 | 
 57 |     async def process_with_multi_thinking(
 58 |         self, thought_data: "ThoughtData", context_prompt: str = ""
 59 |     ) -> MultiThinkingProcessingResult:
 60 |         """Process thoughts using multi-thinking methodology with parallel execution."""
 61 |         start_time = time.time()
 62 | 
 63 |         logger.info("Multi-thinking processing started")
 64 |         if logger.isEnabledFor(logging.INFO):
 65 |             logger.info("Input preview: %s", thought_data.thought[:100])
 66 |             logger.info("Context length: %d chars", len(context_prompt))
 67 | 
 68 |         try:
 69 |             # Step 1: Intelligent routing decision
 70 |             routing_decision = await self.router.route_thought(thought_data)
 71 | 
 72 |             logger.info("Selected strategy: %s", routing_decision.strategy.name)
 73 |             if logger.isEnabledFor(logging.INFO):
 74 |                 sequence = [
 75 |                     direction.value
 76 |                     for direction in routing_decision.strategy.thinking_sequence
 77 |                 ]
 78 |                 logger.info("Thinking sequence: %s", sequence)
 79 | 
 80 |             # Step 2: Execute processing based on complexity
 81 |             if routing_decision.strategy.complexity == ProcessingDepth.SINGLE:
 82 |                 result = await self._process_single_direction(
 83 |                     thought_data, context_prompt, routing_decision
 84 |                 )
 85 |             elif routing_decision.strategy.complexity == ProcessingDepth.DOUBLE:
 86 |                 result = await self._process_double_direction_sequence(
 87 |                     thought_data, context_prompt, routing_decision
 88 |                 )
 89 |             elif routing_decision.strategy.complexity == ProcessingDepth.TRIPLE:
 90 |                 result = await self._process_triple_direction_sequence(
 91 |                     thought_data, context_prompt, routing_decision
 92 |                 )
 93 |             else:  # FULL
 94 |                 result = await self._process_full_direction_sequence(
 95 |                     thought_data, context_prompt, routing_decision
 96 |                 )
 97 | 
 98 |             processing_time = time.time() - start_time
 99 | 
100 |             # Create final result
101 |             final_result = MultiThinkingProcessingResult(
102 |                 content=result["final_content"],
103 |                 strategy_used=routing_decision.strategy.name,
104 |                 thinking_sequence=[
105 |                     direction.value
106 |                     for direction in routing_decision.strategy.thinking_sequence
107 |                 ],
108 |                 processing_time=processing_time,
109 |                 complexity_score=routing_decision.complexity_metrics.complexity_score,
110 |                 cost_reduction=routing_decision.estimated_cost_reduction,
111 |                 individual_results=result.get("individual_results", {}),
112 |                 step_name="multi_thinking_processing",
113 |             )
114 | 
115 |             logger.info(
116 |                 "Multi-thinking processing completed - Time: %.3fs, Cost reduction: %.1f%%, Output: %d chars",
117 |                 processing_time,
118 |                 routing_decision.estimated_cost_reduction,
119 |                 len(final_result.content),
120 |             )
121 | 
122 |             return final_result
123 | 
124 |         except Exception as e:
125 |             processing_time = time.time() - start_time
126 |             logger.exception(
127 |                 f"Multi-thinking processing failed after {processing_time:.3f}s: {e}"
128 |             )
129 | 
130 |             return MultiThinkingProcessingResult(
131 |                 content=f"Multi-thinking processing failed: {e!s}",
132 |                 strategy_used="error_fallback",
133 |                 thinking_sequence=[],
134 |                 processing_time=processing_time,
135 |                 complexity_score=0.0,
136 |                 cost_reduction=0.0,
137 |                 individual_results={},
138 |                 step_name="error_handling",
139 |             )
140 | 
141 |     async def _process_single_direction(
142 |         self, thought_data: "ThoughtData", context: str, decision: RoutingDecision
143 |     ) -> dict[str, Any]:
144 |         """Process single thinking direction mode."""
145 |         thinking_direction = decision.strategy.thinking_sequence[0]
146 |         logger.info(f"  SINGLE THINKING MODE: {thinking_direction.value}")
147 | 
148 |         # Use enhanced model for synthesis thinking, standard model for other directions
149 |         if thinking_direction == ThinkingDirection.SYNTHESIS:
150 |             model = self.model_config.create_enhanced_model()
151 |             logger.info("    Using enhanced model for synthesis thinking")
152 |         else:
153 |             model = self.model_config.create_standard_model()
154 |             logger.info("    Using standard model for focused thinking")
155 | 
156 |         agent = self.thinking_factory.create_thinking_agent(
157 |             thinking_direction, model, context, {}
158 |         )
159 | 
160 |         # Execute processing
161 |         result = await agent.arun(input=thought_data.thought)
162 | 
163 |         # Extract content
164 |         content = self._extract_content(result)
165 | 
166 |         return {
167 |             "final_content": content,
168 |             "individual_results": {thinking_direction.value: content},
169 |         }
170 | 
171 |     async def _process_double_direction_sequence(
172 |         self, thought_data: "ThoughtData", context: str, decision: RoutingDecision
173 |     ) -> dict[str, Any]:
174 |         """Process dual thinking direction sequence with parallel execution."""
175 |         direction1, direction2 = decision.strategy.thinking_sequence
176 |         logger.info(
177 |             f"  DUAL THINKING SEQUENCE: {direction1.value} + {direction2.value} (parallel)"
178 |         )
179 | 
180 |         individual_results = {}
181 | 
182 |         # Check if synthesis agent is involved
183 |         has_synthesis = any(d == ThinkingDirection.SYNTHESIS for d in [direction1, direction2])
184 | 
185 |         if has_synthesis:
186 |             # If synthesis is involved, run non-synthesis agents in parallel, then synthesis
187 |             non_synthesis_directions = [d for d in [direction1, direction2] if d != ThinkingDirection.SYNTHESIS]
188 |             synthesis_direction = ThinkingDirection.SYNTHESIS
189 | 
190 |             # Run non-synthesis agents in parallel
191 |             import asyncio
192 |             tasks = []
193 |             for direction in non_synthesis_directions:
194 |                 model = self.model_config.create_standard_model()
195 |                 logger.info(f"    Using standard model for {direction.value} thinking (parallel)")
196 | 
197 |                 agent = self.thinking_factory.create_thinking_agent(
198 |                     direction, model, context, {}
199 |                 )
200 |                 task = agent.arun(input=thought_data.thought)
201 |                 tasks.append((direction, task))
202 | 
203 |             # Execute parallel tasks
204 |             logger.info(f"    Executing {len(tasks)} thinking agents in parallel")
205 |             parallel_results = await asyncio.gather(*[task for _, task in tasks])
206 | 
207 |             # Process parallel results
208 |             for (direction, _), result in zip(tasks, parallel_results, strict=False):
209 |                 content = self._extract_content(result)
210 |                 individual_results[direction.value] = content
211 |                 logger.info(f"    {direction.value} thinking completed (parallel)")
212 | 
213 |             # Run synthesis agent with all parallel results
214 |             model = self.model_config.create_enhanced_model()
215 |             logger.info(f"    Using enhanced model for {synthesis_direction.value} synthesis")
216 | 
217 |             synthesis_agent = self.thinking_factory.create_thinking_agent(
218 |                 synthesis_direction, model, context, individual_results
219 |             )
220 | 
221 |             # Build synthesis input
222 |             synthesis_input = self._build_synthesis_integration_input(
223 |                 thought_data.thought, individual_results
224 |             )
225 | 
226 |             synthesis_result = await synthesis_agent.arun(input=synthesis_input)
227 |             synthesis_content = self._extract_content(synthesis_result)
228 |             individual_results[synthesis_direction.value] = synthesis_content
229 | 
230 |             logger.info(f"    {synthesis_direction.value} thinking completed")
231 | 
232 |             final_content = synthesis_content
233 |         else:
234 |             # No synthesis agent - run both agents in parallel
235 |             import asyncio
236 |             tasks = []
237 | 
238 |             for direction in [direction1, direction2]:
239 |                 model = self.model_config.create_standard_model()
240 |                 logger.info(f"    Using standard model for {direction.value} thinking (parallel)")
241 | 
242 |                 agent = self.thinking_factory.create_thinking_agent(
243 |                     direction, model, context, {}
244 |                 )
245 |                 task = agent.arun(input=thought_data.thought)
246 |                 tasks.append((direction, task))
247 | 
248 |             # Execute parallel tasks
249 |             logger.info("    Executing 2 thinking agents in parallel")
250 |             parallel_results = await asyncio.gather(*[task for _, task in tasks])
251 | 
252 |             # Process parallel results
253 |             for (direction, _), result in zip(tasks, parallel_results, strict=False):
254 |                 content = self._extract_content(result)
255 |                 individual_results[direction.value] = content
256 |                 logger.info(f"    {direction.value} thinking completed (parallel)")
257 | 
258 |             # Combine results programmatically
259 |             final_content = self._combine_dual_thinking_results(
260 |                 direction1, individual_results[direction1.value],
261 |                 direction2, individual_results[direction2.value],
262 |                 thought_data.thought
263 |             )
264 | 
265 |         return {
266 |             "final_content": final_content,
267 |             "individual_results": individual_results,
268 |         }
269 | 
270 |     async def _process_triple_direction_sequence(
271 |         self, thought_data: "ThoughtData", context: str, decision: RoutingDecision
272 |     ) -> dict[str, Any]:
273 |         """Process triple thinking direction sequence with parallel execution."""
274 |         thinking_sequence = decision.strategy.thinking_sequence
275 |         logger.info(
276 |             f"  TRIPLE THINKING SEQUENCE: {' + '.join(direction.value for direction in thinking_sequence)} (parallel)"
277 |         )
278 | 
279 |         individual_results = {}
280 | 
281 |         # Triple strategy currently uses FACTUAL + CREATIVE + CRITICAL - all run in parallel
282 |         import asyncio
283 |         tasks = []
284 | 
285 |         for thinking_direction in thinking_sequence:
286 |             logger.info(f"    Preparing {thinking_direction.value} thinking for parallel execution")
287 | 
288 |             # All agents use standard model (no synthesis in triple strategy)
289 |             model = self.model_config.create_standard_model()
290 |             logger.info(f"      Using standard model for {thinking_direction.value} thinking")
291 | 
292 |             agent = self.thinking_factory.create_thinking_agent(
293 |                 thinking_direction, model, context, {}
294 |             )
295 | 
296 |             # All agents receive original input directly (parallel processing)
297 |             task = agent.arun(input=thought_data.thought)
298 |             tasks.append((thinking_direction, task))
299 | 
300 |         # Execute all thinking directions in parallel
301 |         logger.info(f"    Executing {len(tasks)} thinking agents in parallel")
302 |         parallel_results = await asyncio.gather(*[task for _, task in tasks])
303 | 
304 |         # Process parallel results
305 |         for (thinking_direction, _), result in zip(tasks, parallel_results, strict=False):
306 |             content = self._extract_content(result)
307 |             individual_results[thinking_direction.value] = content
308 |             logger.info(f"      {thinking_direction.value} thinking completed (parallel)")
309 | 
310 |         # Create programmatic synthesis (no synthesis agent in triple strategy)
311 |         final_content = self._synthesize_triple_thinking_results(
312 |             individual_results, thinking_sequence, thought_data.thought
313 |         )
314 | 
315 |         return {
316 |             "final_content": final_content,
317 |             "individual_results": individual_results,
318 |         }
319 | 
320 |     async def _process_full_direction_sequence(
321 |         self, thought_data: "ThoughtData", context: str, decision: RoutingDecision
322 |     ) -> dict[str, Any]:
323 |         """Process full multi-thinking direction sequence with parallel execution."""
324 |         thinking_sequence = decision.strategy.thinking_sequence
325 |         logger.info(
326 |             "  FULL THINKING SEQUENCE: Initial orchestration -> Parallel processing -> Final synthesis"
327 |         )
328 | 
329 |         individual_results = {}
330 | 
331 |         # Step 1: Initial SYNTHESIS for orchestration (if first agent is SYNTHESIS)
332 |         if thinking_sequence[0] == ThinkingDirection.SYNTHESIS:
333 |             logger.info("    Step 1: Initial synthesis orchestration")
334 | 
335 |             initial_synthesis_model = self.model_config.create_enhanced_model()
336 |             logger.info("      Using enhanced model for initial orchestration")
337 | 
338 |             initial_synthesis_agent = self.thinking_factory.create_thinking_agent(
339 |                 ThinkingDirection.SYNTHESIS, initial_synthesis_model, context, {}
340 |             )
341 | 
342 |             initial_result = await initial_synthesis_agent.arun(input=thought_data.thought)
343 |             initial_content = self._extract_content(initial_result)
344 |             individual_results["synthesis_initial"] = initial_content
345 | 
346 |             logger.info("      Initial orchestration completed")
347 | 
348 |         # Step 2: Identify non-synthesis agents for parallel execution
349 |         non_synthesis_agents = [
350 |             direction for direction in thinking_sequence
351 |             if direction != ThinkingDirection.SYNTHESIS
352 |         ]
353 | 
354 |         if non_synthesis_agents:
355 |             logger.info(f"    Step 2: Parallel execution of {len(non_synthesis_agents)} thinking agents")
356 | 
357 |             import asyncio
358 |             tasks = []
359 | 
360 |             for thinking_direction in non_synthesis_agents:
361 |                 logger.info(f"      Preparing {thinking_direction.value} thinking for parallel execution")
362 | 
363 |                 model = self.model_config.create_standard_model()
364 |                 logger.info(f"        Using standard model for {thinking_direction.value} thinking")
365 | 
366 |                 agent = self.thinking_factory.create_thinking_agent(
367 |                     thinking_direction, model, context, {}
368 |                 )
369 | 
370 |                 # All non-synthesis agents receive original input (parallel processing)
371 |                 task = agent.arun(input=thought_data.thought)
372 |                 tasks.append((thinking_direction, task))
373 | 
374 |             # Execute all non-synthesis agents in parallel
375 |             logger.info(f"      Executing {len(tasks)} thinking agents in parallel")
376 |             parallel_results = await asyncio.gather(*[task for _, task in tasks])
377 | 
378 |             # Process parallel results
379 |             for (thinking_direction, _), result in zip(tasks, parallel_results, strict=False):
380 |                 content = self._extract_content(result)
381 |                 individual_results[thinking_direction.value] = content
382 |                 logger.info(f"        {thinking_direction.value} thinking completed (parallel)")
383 | 
384 |         # Step 3: Final SYNTHESIS for integration (if last agent is SYNTHESIS)
385 |         final_synthesis_agents = [
386 |             i for i, direction in enumerate(thinking_sequence)
387 |             if direction == ThinkingDirection.SYNTHESIS and i > 0
388 |         ]
389 | 
390 |         if final_synthesis_agents:
391 |             logger.info("    Step 3: Final synthesis integration")
392 | 
393 |             final_synthesis_model = self.model_config.create_enhanced_model()
394 |             logger.info("      Using enhanced model for final synthesis")
395 | 
396 |             # Remove initial synthesis from results for final integration
397 |             integration_results = {
398 |                 k: v for k, v in individual_results.items()
399 |                 if not k.startswith("synthesis_")
400 |             }
401 | 
402 |             final_synthesis_agent = self.thinking_factory.create_thinking_agent(
403 |                 ThinkingDirection.SYNTHESIS, final_synthesis_model, context, integration_results
404 |             )
405 | 
406 |             # Build synthesis integration input
407 |             synthesis_input = self._build_synthesis_integration_input(
408 |                 thought_data.thought, integration_results
409 |             )
410 | 
411 |             final_result = await final_synthesis_agent.arun(input=synthesis_input)
412 |             final_content = self._extract_content(final_result)
413 |             individual_results["synthesis_final"] = final_content
414 | 
415 |             logger.info("      Final synthesis integration completed")
416 | 
417 |             # Use final synthesis result as the answer
418 |             final_answer = final_content
419 |         else:
420 |             # No final synthesis - create programmatic synthesis
421 |             final_answer = self._synthesize_full_thinking_results(
422 |                 individual_results, thinking_sequence, thought_data.thought
423 |             )
424 | 
425 |         return {
426 |             "final_content": final_answer,
427 |             "individual_results": individual_results,
428 |         }
429 | 
430 |     def _extract_content(self, result: Any) -> str:
431 |         """Extract content from agent execution result."""
432 |         if hasattr(result, "content"):
433 |             content = result.content
434 |             if isinstance(content, str):
435 |                 return content.strip()
436 |             return str(content).strip()
437 |         if isinstance(result, str):
438 |             return result.strip()
439 |         return str(result).strip()
440 | 
441 |     def _build_sequential_input(
442 |         self,
443 |         original_thought: str,
444 |         previous_results: dict[str, str],
445 |         current_direction: ThinkingDirection,
446 |     ) -> str:
447 |         """Build input for sequential processing."""
448 |         input_parts = [f"Original thought: {original_thought}", ""]
449 | 
450 |         if previous_results:
451 |             input_parts.append("Previous analysis perspectives:")
452 |             for direction_name, content in previous_results.items():
453 |                 # Use generic descriptions instead of specific direction names
454 |                 perspective_name = self._get_generic_perspective_name(direction_name)
455 |                 input_parts.append(
456 |                     f"  {perspective_name}: {content[:200]}{'...' if len(content) > 200 else ''}"
457 |                 )
458 |             input_parts.append("")
459 | 
460 |         # Use generic instruction instead of direction-specific instruction
461 |         thinking_style = self._get_thinking_style_instruction(current_direction)
462 |         input_parts.append(f"Now analyze this from a {thinking_style} perspective.")
463 | 
464 |         return "\n".join(input_parts)
465 | 
466 |     def _build_synthesis_integration_input(
467 |         self, original_thought: str, all_results: dict[str, str]
468 |     ) -> str:
469 |         """Build synthesis integration input."""
470 |         input_parts = [
471 |             f"Original question: {original_thought}",
472 |             "",
473 |             "Collected insights from comprehensive analysis:",
474 |         ]
475 | 
476 |         for direction_name, content in all_results.items():
477 |             if (
478 |                 direction_name != "synthesis"
479 |             ):  # Avoid including previous synthesis results
480 |                 # Completely hide direction concepts, use generic analysis types
481 |                 perspective_name = self._get_generic_perspective_name(direction_name)
482 |                 input_parts.append(f"• {perspective_name}: {content}")
483 | 
484 |         input_parts.extend(
485 |             [
486 |                 "",
487 |                 "TASK: Synthesize all analysis insights into ONE comprehensive, unified answer.",
488 |                 "REQUIREMENTS:",
489 |                 "1. Provide a single, coherent response directly addressing the original question",
490 |                 "2. Integrate all insights naturally without mentioning different analysis types",
491 |                 "3. Do NOT list or separate different analysis perspectives in your response",
492 |                 "4. Do NOT use section headers or reference any specific analysis methods",
493 |                 "5. Do NOT mention 'direction', 'perspective', 'analysis type', or similar terms",
494 |                 "6. Write as a unified voice providing the final answer",
495 |                 "7. This will be the ONLY response the user sees - make it complete and standalone",
496 |                 "8. Your response should read as if it came from a single, integrated thought process",
497 |             ]
498 |         )
499 | 
500 |         return "\n".join(input_parts)
501 | 
502 |     def _combine_dual_thinking_results(
503 |         self,
504 |         direction1: ThinkingDirection,
505 |         content1: str,
506 |         direction2: ThinkingDirection,
507 |         content2: str,
508 |         original_thought: str,
509 |     ) -> str:
510 |         """Combine dual thinking direction results."""
511 |         # If the second is synthesis thinking, return its result directly (should already be synthesized)
512 |         if direction2 == ThinkingDirection.SYNTHESIS:
513 |             return content2
514 | 
515 |         # Otherwise create synthesized answer without mentioning analysis methods
516 |         if (
517 |             direction1 == ThinkingDirection.FACTUAL
518 |             and direction2 == ThinkingDirection.EMOTIONAL
519 |         ):
520 |             return f"Regarding '{original_thought}': A comprehensive analysis reveals both objective realities and human emotional responses. {content1.lower()} while also recognizing that {content2.lower()} These complementary insights suggest a balanced approach that considers both factual evidence and human experience."
521 |         if (
522 |             direction1 == ThinkingDirection.CRITICAL
523 |             and direction2 == ThinkingDirection.OPTIMISTIC
524 |         ):
525 |             return f"Considering '{original_thought}': A thorough evaluation identifies both important concerns and significant opportunities. {content1.lower().strip('.')} while also recognizing promising aspects: {content2.lower()} A measured approach would address the concerns while pursuing the benefits."
526 |         # Generic synthesis, completely hiding analysis structure
527 |         return f"Analyzing '{original_thought}': A comprehensive evaluation reveals multiple important insights. {content1.lower().strip('.')} Additionally, {content2.lower()} Integrating these findings provides a well-rounded understanding that addresses the question from multiple angles."
528 | 
529 |     def _synthesize_triple_thinking_results(
530 |         self,
531 |         results: dict[str, str],
532 |         thinking_sequence: list[ThinkingDirection],
533 |         original_thought: str,
534 |     ) -> str:
535 |         """Synthesize triple thinking direction results."""
536 |         # Create truly synthesized answer, hiding all analysis structure
537 |         content_pieces = []
538 |         for thinking_direction in thinking_sequence:
539 |             direction_name = thinking_direction.value
540 |             content = results.get(direction_name, "")
541 |             if content:
542 |                 # Extract core insights, completely hiding sources
543 |                 clean_content = content.strip().rstrip(".!")
544 |                 content_pieces.append(clean_content)
545 | 
546 |         if len(content_pieces) >= 3:
547 |             # Synthesis of three or more perspectives, completely unified
548 |             return f"""Considering the question '{original_thought}', a comprehensive analysis reveals several crucial insights.
549 | 
550 | {content_pieces[0].lower()}, which establishes the foundation for understanding. This leads to recognizing that {content_pieces[1].lower()}, adding essential depth to our comprehension. Furthermore, {content_pieces[2].lower() if len(content_pieces) > 2 else ""}
551 | 
552 | Drawing these insights together, the answer emerges as a unified understanding that acknowledges the full complexity while providing clear guidance."""
553 |         if len(content_pieces) == 2:
554 |             return f"Addressing '{original_thought}': A thorough evaluation shows that {content_pieces[0].lower()}, and importantly, {content_pieces[1].lower()} Together, these insights form a comprehensive understanding."
555 |         if len(content_pieces) == 1:
556 |             return f"Regarding '{original_thought}': {content_pieces[0]}"
557 |         return f"After comprehensive consideration of '{original_thought}', the analysis suggests this question merits deeper exploration to provide a complete answer."
558 | 
559 |     def _synthesize_full_thinking_results(
560 |         self,
561 |         results: dict[str, str],
562 |         thinking_sequence: list[ThinkingDirection],
563 |         original_thought: str,
564 |     ) -> str:
565 |         """Synthesize full multi-thinking results."""
566 |         # If there's a synthesis result, use it preferentially
567 |         synthesis_result = results.get("synthesis")
568 |         if synthesis_result:
569 |             return synthesis_result
570 | 
571 |         # Otherwise create synthesis
572 |         return self._synthesize_triple_thinking_results(
573 |             results, thinking_sequence, original_thought
574 |         )
575 | 
576 |     def _get_thinking_contribution(self, thinking_direction: ThinkingDirection) -> str:
577 |         """Get thinking direction contribution description."""
578 |         contributions = {
579 |             ThinkingDirection.FACTUAL: "factual information and objective data",
580 |             ThinkingDirection.EMOTIONAL: "emotional insights and intuitive responses",
581 |             ThinkingDirection.CRITICAL: "critical analysis and risk assessment",
582 |             ThinkingDirection.OPTIMISTIC: "positive possibilities and value identification",
583 |             ThinkingDirection.CREATIVE: "creative alternatives and innovative solutions",
584 |             ThinkingDirection.SYNTHESIS: "process management and integrated thinking",
585 |         }
586 |         return contributions.get(thinking_direction, "specialized thinking")
587 | 
588 |     def _get_generic_perspective_name(self, direction_name: str) -> str:
589 |         """Get generic analysis type name for thinking direction, hiding direction concepts."""
590 |         name_mapping = {
591 |             "factual": "Factual analysis",
592 |             "emotional": "Emotional considerations",
593 |             "critical": "Risk assessment",
594 |             "optimistic": "Opportunity analysis",
595 |             "creative": "Creative exploration",
596 |             "synthesis": "Strategic synthesis",
597 |         }
598 |         return name_mapping.get(direction_name.lower(), "Analysis")
599 | 
600 |     def _get_thinking_style_instruction(
601 |         self, thinking_direction: ThinkingDirection
602 |     ) -> str:
603 |         """Get thinking style instruction, avoiding mention of direction concepts."""
604 |         style_mapping = {
605 |             ThinkingDirection.FACTUAL: "factual and objective",
606 |             ThinkingDirection.EMOTIONAL: "emotional and intuitive",
607 |             ThinkingDirection.CRITICAL: "critical and cautious",
608 |             ThinkingDirection.OPTIMISTIC: "positive and optimistic",
609 |             ThinkingDirection.CREATIVE: "creative and innovative",
610 |             ThinkingDirection.SYNTHESIS: "strategic and integrative",
611 |         }
612 |         return style_mapping.get(thinking_direction, "analytical")
613 | 
614 | 
615 | # Create global processor instance
616 | _multi_thinking_processor = MultiThinkingSequentialProcessor()
617 | 
618 | 
619 | # Convenience function
620 | async def process_with_multi_thinking(
621 |     thought_data: "ThoughtData", context: str = ""
622 | ) -> MultiThinkingProcessingResult:
623 |     """Convenience function for processing thoughts using multi-thinking directions."""
624 |     return await _multi_thinking_processor.process_with_multi_thinking(
625 |         thought_data, context
626 |     )
627 | 
628 | 
629 | def create_multi_thinking_step_output(
630 |     result: MultiThinkingProcessingResult,
631 | ) -> StepOutput:
632 |     """Convert multi-thinking processing result to Agno StepOutput."""
633 |     return StepOutput(
634 |         content=result.content,
635 |         success=True,
636 |         step_name=result.step_name,
637 |     )
638 | 
```
Page 2/2FirstPrevNextLast