This is page 2 of 2. Use http://codebase.md/fradser/mcp-server-mas-sequential-thinking?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .env.example ├── .gitignore ├── .python-version ├── CHANGELOG.md ├── CLAUDE.md ├── Dockerfile ├── pyproject.toml ├── README.md ├── README.zh-CN.md ├── smithery.yaml ├── src │ └── mcp_server_mas_sequential_thinking │ ├── __init__.py │ ├── config │ │ ├── __init__.py │ │ ├── constants.py │ │ └── modernized_config.py │ ├── core │ │ ├── __init__.py │ │ ├── models.py │ │ ├── session.py │ │ └── types.py │ ├── infrastructure │ │ ├── __init__.py │ │ ├── logging_config.py │ │ └── persistent_memory.py │ ├── main.py │ ├── processors │ │ ├── __init__.py │ │ ├── multi_thinking_core.py │ │ └── multi_thinking_processor.py │ ├── routing │ │ ├── __init__.py │ │ ├── agno_workflow_router.py │ │ ├── ai_complexity_analyzer.py │ │ ├── complexity_types.py │ │ └── multi_thinking_router.py │ ├── services │ │ ├── __init__.py │ │ ├── context_builder.py │ │ ├── processing_orchestrator.py │ │ ├── response_formatter.py │ │ ├── response_processor.py │ │ ├── retry_handler.py │ │ ├── server_core.py │ │ ├── thought_processor_refactored.py │ │ └── workflow_executor.py │ └── utils │ ├── __init__.py │ └── utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/services/server_core.py: -------------------------------------------------------------------------------- ```python 1 | """Refactored server core with separated concerns and reduced complexity.""" 2 | 3 | import os 4 | from abc import ABC, abstractmethod 5 | from collections.abc import AsyncIterator 6 | from contextlib import asynccontextmanager 7 | from dataclasses import dataclass 8 | from pathlib import Path 9 | from typing import Any 10 | 11 | # Lazy import to break circular dependency 12 | from pydantic import ValidationError 13 | 14 | from mcp_server_mas_sequential_thinking.config import ( 15 | DefaultTimeouts, 16 | DefaultValues, 17 | PerformanceMetrics, 18 | check_required_api_keys, 19 | ) 20 | from mcp_server_mas_sequential_thinking.core import ( 21 | ConfigurationError, 22 | SessionMemory, 23 | ThoughtData, 24 | ) 25 | from mcp_server_mas_sequential_thinking.utils import setup_logging 26 | 27 | logger = setup_logging() 28 | 29 | 30 | class LoggingMixin: 31 | """Mixin class providing common logging utilities with reduced duplication.""" 32 | 33 | @staticmethod 34 | def _log_section_header( 35 | title: str, separator_length: int = PerformanceMetrics.SEPARATOR_LENGTH 36 | ) -> None: 37 | """Log a formatted section header.""" 38 | logger.info(f"{title}") 39 | 40 | @staticmethod 41 | def _log_metrics_block(title: str, metrics: dict[str, Any]) -> None: 42 | """Log a formatted metrics block.""" 43 | logger.info(f"{title}") 44 | for key, value in metrics.items(): 45 | if isinstance(value, float): 46 | logger.info(f" {key}: {value:.2f}") 47 | elif isinstance(value, (int, str)): 48 | logger.info(f" {key}: {value}") 49 | else: 50 | logger.info(f" {key}: {value}") 51 | 52 | @staticmethod 53 | def _log_separator(length: int = PerformanceMetrics.SEPARATOR_LENGTH) -> None: 54 | """Log a separator line.""" 55 | logger.info(f" {'=' * length}") 56 | 57 | @staticmethod 58 | def _calculate_efficiency_score(processing_time: float) -> float: 59 | """Calculate efficiency score using standard metrics.""" 60 | return ( 61 | PerformanceMetrics.PERFECT_EFFICIENCY_SCORE 62 | if processing_time < PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD 63 | else max( 64 | PerformanceMetrics.MINIMUM_EFFICIENCY_SCORE, 65 | PerformanceMetrics.EFFICIENCY_TIME_THRESHOLD / processing_time, 66 | ) 67 | ) 68 | 69 | @staticmethod 70 | def _calculate_execution_consistency(success_indicator: bool) -> float: 71 | """Calculate execution consistency using standard metrics.""" 72 | return ( 73 | PerformanceMetrics.PERFECT_EXECUTION_CONSISTENCY 74 | if success_indicator 75 | else PerformanceMetrics.DEFAULT_EXECUTION_CONSISTENCY 76 | ) 77 | 78 | 79 | @dataclass(frozen=True, slots=True) 80 | class ServerConfig: 81 | """Immutable server configuration with clear defaults.""" 82 | 83 | provider: str 84 | team_mode: str = DefaultValues.DEFAULT_TEAM_MODE 85 | log_level: str = DefaultValues.DEFAULT_LOG_LEVEL 86 | max_retries: int = DefaultValues.DEFAULT_MAX_RETRIES 87 | timeout: float = DefaultTimeouts.PROCESSING_TIMEOUT 88 | 89 | @classmethod 90 | def from_environment(cls) -> "ServerConfig": 91 | """Create config from environment with sensible defaults.""" 92 | return cls( 93 | provider=os.environ.get("LLM_PROVIDER", DefaultValues.DEFAULT_LLM_PROVIDER), 94 | team_mode=os.environ.get( 95 | "TEAM_MODE", DefaultValues.DEFAULT_TEAM_MODE 96 | ).lower(), 97 | log_level=os.environ.get("LOG_LEVEL", DefaultValues.DEFAULT_LOG_LEVEL), 98 | max_retries=int( 99 | os.environ.get("MAX_RETRIES", str(DefaultValues.DEFAULT_MAX_RETRIES)) 100 | ), 101 | timeout=float( 102 | os.environ.get("TIMEOUT", str(DefaultValues.DEFAULT_TIMEOUT)) 103 | ), 104 | ) 105 | 106 | 107 | class ServerInitializer(ABC): 108 | """Abstract initializer for server components.""" 109 | 110 | @abstractmethod 111 | async def initialize(self, config: ServerConfig) -> None: 112 | """Initialize server component.""" 113 | 114 | @abstractmethod 115 | async def cleanup(self) -> None: 116 | """Clean up server component.""" 117 | 118 | 119 | class EnvironmentInitializer(ServerInitializer): 120 | """Handles environment validation and setup.""" 121 | 122 | async def initialize(self, config: ServerConfig) -> None: 123 | """Validate environment requirements with enhanced error handling.""" 124 | logger.info(f"Initializing environment with {config.provider} provider") 125 | 126 | try: 127 | # Graceful degradation prevents startup failures from missing optional keys 128 | missing_keys = check_required_api_keys() 129 | if missing_keys: 130 | logger.warning(f"Missing API keys: {', '.join(missing_keys)}") 131 | # Note: Don't fail here as some providers might not require keys 132 | 133 | # Ensure log directory exists 134 | log_dir = Path.home() / ".sequential_thinking" / "logs" 135 | if not log_dir.exists(): 136 | logger.info(f"Creating log directory: {log_dir}") 137 | try: 138 | log_dir.mkdir(parents=True, exist_ok=True) 139 | except OSError as e: 140 | raise ConfigurationError( 141 | f"Failed to create log directory {log_dir}: {e}" 142 | ) from e 143 | 144 | except Exception as e: 145 | if not isinstance(e, ConfigurationError): 146 | raise ConfigurationError( 147 | f"Environment initialization failed: {e}" 148 | ) from e 149 | raise 150 | 151 | async def cleanup(self) -> None: 152 | """No cleanup needed for environment.""" 153 | 154 | 155 | class ServerState: 156 | """Manages server state with proper lifecycle and separation of concerns.""" 157 | 158 | def __init__(self) -> None: 159 | self._config: ServerConfig | None = None 160 | self._session: SessionMemory | None = None 161 | self._initializers = [ 162 | EnvironmentInitializer(), 163 | ] 164 | 165 | async def initialize(self, config: ServerConfig) -> None: 166 | """Initialize all server components.""" 167 | self._config = config 168 | 169 | # Ordered initialization prevents dependency conflicts 170 | for initializer in self._initializers: 171 | await initializer.initialize(config) 172 | 173 | # Session-based architecture simplifies state management 174 | self._session = SessionMemory() 175 | 176 | logger.info( 177 | "Server state initialized successfully with multi-thinking workflow" 178 | ) 179 | 180 | async def cleanup(self) -> None: 181 | """Clean up all server components.""" 182 | # Clean up in reverse order 183 | for initializer in reversed(self._initializers): 184 | await initializer.cleanup() 185 | 186 | self._config = None 187 | self._session = None 188 | 189 | logger.info("Server state cleaned up") 190 | 191 | @property 192 | def config(self) -> ServerConfig: 193 | """Get current configuration.""" 194 | if self._config is None: 195 | raise RuntimeError("Server not initialized - config unavailable") 196 | return self._config 197 | 198 | @property 199 | def session(self) -> SessionMemory: 200 | """Get current session.""" 201 | if self._session is None: 202 | raise RuntimeError("Server not initialized - session unavailable") 203 | return self._session 204 | 205 | 206 | # Remove redundant exception definition as it's now in types.py 207 | 208 | 209 | class ThoughtProcessor: 210 | """Simplified thought processor that delegates to the refactored implementation. 211 | 212 | This maintains backward compatibility while using the new service-based architecture. 213 | """ 214 | 215 | def __init__(self, session: SessionMemory) -> None: 216 | # Import and delegate to the refactored implementation 217 | from .thought_processor_refactored import ( 218 | ThoughtProcessor as RefactoredProcessor, 219 | ) 220 | 221 | self._processor = RefactoredProcessor(session) 222 | 223 | async def process_thought(self, thought_data: ThoughtData) -> str: 224 | """Process a thought through the team with comprehensive error handling.""" 225 | return await self._processor.process_thought(thought_data) 226 | 227 | # Legacy methods for backward compatibility - delegate to refactored processor 228 | def _extract_response_content(self, response) -> str: 229 | """Legacy method - delegates to refactored processor.""" 230 | return self._processor._extract_response_content(response) 231 | 232 | def _build_context_prompt(self, thought_data: ThoughtData) -> str: 233 | """Legacy method - delegates to refactored processor.""" 234 | return self._processor._build_context_prompt(thought_data) 235 | 236 | def _format_response(self, content: str, thought_data: ThoughtData) -> str: 237 | """Legacy method - delegates to refactored processor.""" 238 | return self._processor._format_response(content, thought_data) 239 | 240 | async def _execute_single_agent_processing( 241 | self, input_prompt: str, routing_decision=None 242 | ) -> str: 243 | """Legacy method - delegates to refactored processor.""" 244 | return await self._processor._execute_single_agent_processing( 245 | input_prompt, routing_decision 246 | ) 247 | 248 | async def _execute_team_processing(self, input_prompt: str) -> str: 249 | """Legacy method - delegates to refactored processor.""" 250 | return await self._processor._execute_team_processing(input_prompt) 251 | 252 | 253 | @asynccontextmanager 254 | async def create_server_lifespan() -> AsyncIterator[ServerState]: 255 | """Create server lifespan context manager with proper resource management.""" 256 | config = ServerConfig.from_environment() 257 | server_state = ServerState() 258 | 259 | try: 260 | await server_state.initialize(config) 261 | logger.info("Server started successfully") 262 | yield server_state 263 | 264 | except Exception as e: 265 | logger.error(f"Server initialization failed: {e}", exc_info=True) 266 | raise ServerInitializationError(f"Failed to initialize server: {e}") from e 267 | 268 | finally: 269 | await server_state.cleanup() 270 | logger.info("Server shutdown complete") 271 | 272 | 273 | class ServerInitializationError(Exception): 274 | """Custom exception for server initialization failures.""" 275 | 276 | 277 | def create_validated_thought_data( 278 | thought: str, 279 | thoughtNumber: int, 280 | totalThoughts: int, 281 | nextThoughtNeeded: bool, 282 | isRevision: bool, 283 | branchFromThought: int | None, 284 | branchId: str | None, 285 | needsMoreThoughts: bool, 286 | ) -> ThoughtData: 287 | """Create and validate thought data with enhanced error reporting.""" 288 | try: 289 | return ThoughtData( 290 | thought=thought.strip(), 291 | thoughtNumber=thoughtNumber, 292 | totalThoughts=totalThoughts, 293 | nextThoughtNeeded=nextThoughtNeeded, 294 | isRevision=isRevision, 295 | branchFromThought=branchFromThought, 296 | branchId=branchId.strip() if branchId else None, 297 | needsMoreThoughts=needsMoreThoughts, 298 | ) 299 | except ValidationError as e: 300 | raise ValueError(f"Invalid thought data: {e}") from e 301 | 302 | 303 | # Global server state with workflow support 304 | _server_state: ServerState | None = None 305 | _thought_processor: ThoughtProcessor | None = None 306 | 307 | 308 | async def get_thought_processor() -> ThoughtProcessor: 309 | """Get or create the global thought processor with workflow support.""" 310 | global _thought_processor, _server_state 311 | 312 | if _thought_processor is None: 313 | if _server_state is None: 314 | raise RuntimeError( 315 | "Server not properly initialized - _server_state is None. Ensure app_lifespan is running." 316 | ) 317 | 318 | logger.info("Initializing ThoughtProcessor with multi-thinking workflow") 319 | _thought_processor = ThoughtProcessor(_server_state.session) 320 | 321 | return _thought_processor 322 | ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/config/modernized_config.py: -------------------------------------------------------------------------------- ```python 1 | """Modernized configuration management with dependency injection and clean abstractions. 2 | 3 | Clean configuration management with modern Python patterns. 4 | """ 5 | 6 | import os 7 | from abc import ABC, abstractmethod 8 | from dataclasses import dataclass 9 | from typing import Any, Protocol, runtime_checkable 10 | 11 | from agno.models.anthropic import Claude 12 | from agno.models.base import Model 13 | from agno.models.deepseek import DeepSeek 14 | from agno.models.groq import Groq 15 | from agno.models.ollama import Ollama 16 | from agno.models.openai import OpenAIChat 17 | from agno.models.openrouter import OpenRouter 18 | 19 | 20 | class GitHubOpenAI(OpenAIChat): 21 | """OpenAI provider configured for GitHub Models API with enhanced validation.""" 22 | 23 | @staticmethod 24 | def _validate_github_token(token: str) -> None: 25 | """Validate GitHub token format with comprehensive security checks.""" 26 | if not token: 27 | raise ValueError("GitHub token is required but not provided") 28 | 29 | # Strip whitespace and validate basic format 30 | token = token.strip() 31 | 32 | # Valid GitHub token prefixes with their expected lengths 33 | token_specs = { 34 | "ghp_": 40, # Classic personal access token 35 | "github_pat_": lambda t: len(t) >= 93, # Fine-grained PAT (variable length) 36 | "gho_": 40, # OAuth app token 37 | "ghu_": 40, # User-to-server token 38 | "ghs_": 40, # Server-to-server token 39 | } 40 | 41 | # Check token prefix and length 42 | valid_token = False 43 | for prefix, expected_length in token_specs.items(): 44 | if token.startswith(prefix): 45 | if callable(expected_length): 46 | if expected_length(token): 47 | valid_token = True 48 | break 49 | elif len(token) == expected_length: 50 | valid_token = True 51 | break 52 | 53 | if not valid_token: 54 | raise ValueError( 55 | "Invalid GitHub token format. Token must be a valid GitHub personal " 56 | "access token, " 57 | "OAuth token, or fine-grained personal access token with correct " 58 | "prefix and length." 59 | ) 60 | 61 | # Enhanced entropy validation to prevent fake tokens 62 | token_body = ( 63 | token[4:] 64 | if token.startswith("ghp_") 65 | else token.split("_", 1)[1] 66 | if "_" in token 67 | else token 68 | ) 69 | 70 | # Check for minimum entropy (character diversity) 71 | unique_chars = len(set(token_body.lower())) 72 | if unique_chars < 15: # GitHub tokens should have high entropy 73 | raise ValueError( 74 | "GitHub token appears to have insufficient entropy. Please ensure " 75 | "you're using a real GitHub token." 76 | ) 77 | 78 | # Check for obvious fake patterns 79 | fake_patterns = ["test", "fake", "demo", "example", "placeholder", "your_token"] 80 | if any(pattern in token.lower() for pattern in fake_patterns): 81 | raise ValueError( 82 | "GitHub token appears to be a placeholder or test value. Please " 83 | "use a real GitHub token." 84 | ) 85 | 86 | def __init__(self, **kwargs: Any) -> None: 87 | # Set GitHub Models configuration 88 | kwargs.setdefault("base_url", "https://models.github.ai/inference") 89 | 90 | if "api_key" not in kwargs: 91 | kwargs["api_key"] = os.environ.get("GITHUB_TOKEN") 92 | 93 | api_key = kwargs.get("api_key") 94 | if not api_key: 95 | raise ValueError( 96 | "GitHub token is required but not found in GITHUB_TOKEN " 97 | "environment variable" 98 | ) 99 | 100 | if isinstance(api_key, str): 101 | self._validate_github_token(api_key) 102 | super().__init__(**kwargs) 103 | 104 | 105 | @dataclass(frozen=True) 106 | class ModelConfig: 107 | """Immutable configuration for model provider and settings.""" 108 | 109 | provider_class: type[Model] 110 | enhanced_model_id: str 111 | standard_model_id: str 112 | api_key: str | None = None 113 | 114 | def create_enhanced_model(self) -> Model: 115 | """Create enhanced model instance (used for complex synthesis like Blue Hat).""" 116 | # Enable prompt caching for Anthropic models 117 | if self.provider_class == Claude: 118 | return self.provider_class( 119 | id=self.enhanced_model_id, 120 | # Note: cache_system_prompt removed - not available in current Agno version 121 | ) 122 | return self.provider_class(id=self.enhanced_model_id) 123 | 124 | def create_standard_model(self) -> Model: 125 | """Create standard model instance (used for individual hat processing).""" 126 | # Enable prompt caching for Anthropic models 127 | if self.provider_class == Claude: 128 | return self.provider_class( 129 | id=self.standard_model_id, 130 | # Note: cache_system_prompt removed - not available in current Agno version 131 | ) 132 | return self.provider_class(id=self.standard_model_id) 133 | 134 | 135 | @runtime_checkable 136 | class ConfigurationStrategy(Protocol): 137 | """Protocol defining configuration strategy interface.""" 138 | 139 | def get_config(self) -> ModelConfig: 140 | """Return model configuration for this strategy.""" 141 | ... 142 | 143 | def get_required_environment_variables(self) -> dict[str, bool]: 144 | """Return required environment variables and whether they're optional.""" 145 | ... 146 | 147 | 148 | class BaseProviderStrategy(ABC): 149 | """Abstract base strategy with common functionality.""" 150 | 151 | @property 152 | @abstractmethod 153 | def provider_class(self) -> type[Model]: 154 | """Return the provider model class.""" 155 | 156 | @property 157 | @abstractmethod 158 | def default_enhanced_model(self) -> str: 159 | """Return default enhanced model ID (for complex synthesis).""" 160 | 161 | @property 162 | @abstractmethod 163 | def default_standard_model(self) -> str: 164 | """Return default standard model ID (for individual processing).""" 165 | 166 | @property 167 | @abstractmethod 168 | def api_key_name(self) -> str | None: 169 | """Return API key environment variable name.""" 170 | 171 | def _get_env_with_fallback(self, env_var: str, fallback: str) -> str: 172 | """Get environment variable with fallback to default.""" 173 | value = os.environ.get(env_var, "").strip() 174 | return value if value else fallback 175 | 176 | def get_config(self) -> ModelConfig: 177 | """Get complete configuration with environment overrides.""" 178 | prefix = self.__class__.__name__.replace("Strategy", "").upper() 179 | 180 | enhanced_model = self._get_env_with_fallback( 181 | f"{prefix}_ENHANCED_MODEL_ID", self.default_enhanced_model 182 | ) 183 | standard_model = self._get_env_with_fallback( 184 | f"{prefix}_STANDARD_MODEL_ID", self.default_standard_model 185 | ) 186 | 187 | # Get API key with enhanced validation and None handling 188 | api_key: str | None = None 189 | if self.api_key_name: 190 | api_key = os.environ.get(self.api_key_name, "").strip() 191 | api_key = api_key if api_key else None 192 | 193 | return ModelConfig( 194 | provider_class=self.provider_class, 195 | enhanced_model_id=enhanced_model, 196 | standard_model_id=standard_model, 197 | api_key=api_key, 198 | ) 199 | 200 | def get_required_environment_variables(self) -> dict[str, bool]: 201 | """Return required environment variables.""" 202 | env_vars: dict[str, bool] = {} 203 | 204 | if self.api_key_name: 205 | env_vars[self.api_key_name] = False # Required 206 | 207 | # Enhanced/standard environment variables (optional) 208 | prefix = self.__class__.__name__.replace("Strategy", "").upper() 209 | env_vars[f"{prefix}_ENHANCED_MODEL_ID"] = True # Optional 210 | env_vars[f"{prefix}_STANDARD_MODEL_ID"] = True # Optional 211 | 212 | return env_vars 213 | 214 | 215 | # Concrete strategy implementations 216 | class DeepSeekStrategy(BaseProviderStrategy): 217 | """DeepSeek provider strategy.""" 218 | 219 | provider_class = DeepSeek 220 | default_enhanced_model = "deepseek-chat" 221 | default_standard_model = "deepseek-chat" 222 | api_key_name = "DEEPSEEK_API_KEY" 223 | 224 | 225 | class GroqStrategy(BaseProviderStrategy): 226 | """Groq provider strategy.""" 227 | 228 | provider_class = Groq 229 | default_enhanced_model = "openai/gpt-oss-120b" 230 | default_standard_model = "openai/gpt-oss-20b" 231 | api_key_name = "GROQ_API_KEY" 232 | 233 | 234 | class OpenRouterStrategy(BaseProviderStrategy): 235 | """OpenRouter provider strategy.""" 236 | 237 | provider_class = OpenRouter 238 | default_enhanced_model = "deepseek/deepseek-chat-v3-0324" 239 | default_standard_model = "deepseek/deepseek-r1" 240 | api_key_name = "OPENROUTER_API_KEY" 241 | 242 | 243 | class OllamaStrategy(BaseProviderStrategy): 244 | """Ollama provider strategy (no API key required).""" 245 | 246 | provider_class = Ollama 247 | default_enhanced_model = "devstral:24b" 248 | default_standard_model = "devstral:24b" 249 | api_key_name = None 250 | 251 | 252 | class GitHubStrategy(BaseProviderStrategy): 253 | """GitHub Models provider strategy.""" 254 | 255 | @property 256 | def provider_class(self) -> type[Model]: 257 | return GitHubOpenAI 258 | 259 | @property 260 | def default_enhanced_model(self) -> str: 261 | return "openai/gpt-5" 262 | 263 | @property 264 | def default_standard_model(self) -> str: 265 | return "openai/gpt-5-min" 266 | 267 | @property 268 | def api_key_name(self) -> str: 269 | return "GITHUB_TOKEN" 270 | 271 | 272 | class AnthropicStrategy(BaseProviderStrategy): 273 | """Anthropic Claude provider strategy with prompt caching enabled.""" 274 | 275 | @property 276 | def provider_class(self) -> type[Model]: 277 | return Claude 278 | 279 | @property 280 | def default_enhanced_model(self) -> str: 281 | return "claude-3-5-sonnet-20241022" 282 | 283 | @property 284 | def default_standard_model(self) -> str: 285 | return "claude-3-5-haiku-20241022" 286 | 287 | @property 288 | def api_key_name(self) -> str: 289 | return "ANTHROPIC_API_KEY" 290 | 291 | 292 | class ConfigurationManager: 293 | """Manages configuration strategies with dependency injection.""" 294 | 295 | def __init__(self) -> None: 296 | self._strategies = { 297 | "deepseek": DeepSeekStrategy(), 298 | "groq": GroqStrategy(), 299 | "openrouter": OpenRouterStrategy(), 300 | "ollama": OllamaStrategy(), 301 | "github": GitHubStrategy(), 302 | "anthropic": AnthropicStrategy(), 303 | } 304 | self._default_strategy = "deepseek" 305 | 306 | def register_strategy(self, name: str, strategy: ConfigurationStrategy) -> None: 307 | """Register a new configuration strategy.""" 308 | self._strategies[name] = strategy 309 | 310 | def get_strategy(self, provider_name: str | None = None) -> ConfigurationStrategy: 311 | """Get strategy for specified provider.""" 312 | if provider_name is None: 313 | provider_name = os.environ.get("LLM_PROVIDER", self._default_strategy) 314 | 315 | provider_name = provider_name.lower() 316 | 317 | if provider_name not in self._strategies: 318 | available = list(self._strategies.keys()) 319 | raise ValueError( 320 | f"Unknown provider: {provider_name}. Available: {available}" 321 | ) 322 | 323 | return self._strategies[provider_name] 324 | 325 | def get_model_config(self, provider_name: str | None = None) -> ModelConfig: 326 | """Get model configuration using dependency injection.""" 327 | strategy = self.get_strategy(provider_name) 328 | return strategy.get_config() 329 | 330 | def validate_environment(self, provider_name: str | None = None) -> dict[str, str]: 331 | """Validate environment variables and return missing required ones.""" 332 | strategy = self.get_strategy(provider_name) 333 | required_vars = strategy.get_required_environment_variables() 334 | 335 | missing: dict[str, str] = {} 336 | for var_name, is_optional in required_vars.items(): 337 | if not is_optional and not os.environ.get(var_name): 338 | missing[var_name] = "Required but not set" 339 | 340 | # Check EXA API key for research functionality (optional) 341 | # Note: EXA tools will be disabled if key is not provided 342 | exa_key = os.environ.get("EXA_API_KEY") 343 | if not exa_key: 344 | # Don't fail startup - just log warning that research will be disabled 345 | import logging 346 | 347 | logging.getLogger(__name__).warning( 348 | "EXA_API_KEY not found. Research tools will be disabled." 349 | ) 350 | 351 | return missing 352 | 353 | def get_available_providers(self) -> list[str]: 354 | """Get list of available provider names.""" 355 | return list(self._strategies.keys()) 356 | 357 | 358 | # Singleton instance for dependency injection 359 | _config_manager = ConfigurationManager() 360 | 361 | 362 | # Public API functions 363 | def get_model_config(provider_name: str | None = None) -> ModelConfig: 364 | """Get model configuration using modernized configuration management.""" 365 | return _config_manager.get_model_config(provider_name) 366 | 367 | 368 | def check_required_api_keys(provider_name: str | None = None) -> list[str]: 369 | """Check for missing required API keys.""" 370 | missing_vars = _config_manager.validate_environment(provider_name) 371 | return list(missing_vars.keys()) 372 | 373 | 374 | def register_provider_strategy(name: str, strategy: ConfigurationStrategy) -> None: 375 | """Register a custom provider strategy.""" 376 | _config_manager.register_strategy(name, strategy) 377 | 378 | 379 | def get_available_providers() -> list[str]: 380 | """Get list of available providers.""" 381 | return _config_manager.get_available_providers() 382 | ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/routing/agno_workflow_router.py: -------------------------------------------------------------------------------- ```python 1 | """Multi-Thinking Workflow Router - Complete Rewrite. 2 | 3 | 纯净的多向思维工作流实现, 基于Agno v2.0框架。 4 | 完全移除旧的复杂度路由, 专注于多向思维方法论。 5 | """ 6 | 7 | import logging 8 | import re 9 | import time 10 | from dataclasses import dataclass 11 | from typing import Any 12 | 13 | from agno.workflow.router import Router 14 | from agno.workflow.step import Step 15 | from agno.workflow.types import StepInput, StepOutput 16 | from agno.workflow.workflow import Workflow 17 | 18 | from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config 19 | 20 | # Import at module level - moved from function to avoid PLC0415 21 | from mcp_server_mas_sequential_thinking.core.models import ThoughtData 22 | from mcp_server_mas_sequential_thinking.processors.multi_thinking_processor import ( 23 | MultiThinkingSequentialProcessor, 24 | create_multi_thinking_step_output, 25 | ) 26 | 27 | logger = logging.getLogger(__name__) 28 | 29 | 30 | @dataclass 31 | class MultiThinkingWorkflowResult: 32 | """Result from Multi-Thinking workflow execution.""" 33 | 34 | content: str 35 | strategy_used: str 36 | processing_time: float 37 | complexity_score: float 38 | step_name: str 39 | thinking_sequence: list[str] 40 | cost_reduction: float 41 | 42 | 43 | class MultiThinkingWorkflowRouter: 44 | """Pure Multi-Thinking workflow router using Agno v2.0.""" 45 | 46 | def __init__(self) -> None: 47 | """Initialize Multi-Thinking workflow router.""" 48 | self.model_config = get_model_config() 49 | 50 | # Initialize Multi-Thinking processor 51 | self.multi_thinking_processor = MultiThinkingSequentialProcessor() 52 | 53 | # Create Multi-Thinking processing step 54 | self.multi_thinking_step = self._create_multi_thinking_step() 55 | 56 | # Create router that always selects Multi-Thinking 57 | self.router = Router( 58 | name="multi_thinking_router", 59 | selector=self._multi_thinking_selector, 60 | choices=[self.multi_thinking_step], 61 | ) 62 | 63 | # Create main workflow 64 | self.workflow = Workflow( 65 | name="multi_thinking_workflow", 66 | steps=[self.router], 67 | ) 68 | 69 | logger.info("Multi-Thinking Workflow Router initialized") 70 | 71 | def _multi_thinking_selector(self, step_input: StepInput) -> list[Step]: 72 | """Selector that always returns Multi-Thinking processing.""" 73 | try: 74 | logger.info("🎩 MULTI-THINKING WORKFLOW ROUTING:") 75 | 76 | # Extract thought content for logging 77 | if isinstance(step_input.input, dict): 78 | thought_content = step_input.input.get("thought", "") 79 | thought_number = step_input.input.get("thought_number", 1) 80 | total_thoughts = step_input.input.get("total_thoughts", 1) 81 | else: 82 | thought_content = str(step_input.input) 83 | thought_number = 1 84 | total_thoughts = 1 85 | 86 | logger.info( 87 | " 📝 Input: %s%s", 88 | thought_content[:100], 89 | "..." if len(thought_content) > 100 else "", 90 | ) 91 | logger.info(" 🔢 Progress: %s/%s", thought_number, total_thoughts) 92 | logger.info(" ✅ Multi-Thinking selected - exclusive thinking methodology") 93 | 94 | return [self.multi_thinking_step] 95 | 96 | except Exception as e: 97 | logger.exception("Error in Multi-Thinking selector: %s", e) 98 | logger.warning("Continuing with Multi-Thinking processing despite error") 99 | return [self.multi_thinking_step] 100 | 101 | def _create_multi_thinking_step(self) -> Step: 102 | """Create Six Thinking Hats processing step.""" 103 | 104 | async def multi_thinking_executor( 105 | step_input: StepInput, session_state: dict[str, Any] 106 | ) -> StepOutput: 107 | """Execute Multi-Thinking thinking process.""" 108 | try: 109 | logger.info("🎩 MULTI-THINKING STEP EXECUTION:") 110 | 111 | # Extract thought content and metadata 112 | if isinstance(step_input.input, dict): 113 | thought_content = step_input.input.get( 114 | "thought", str(step_input.input) 115 | ) 116 | thought_number = step_input.input.get("thought_number", 1) 117 | total_thoughts = step_input.input.get("total_thoughts", 1) 118 | context = step_input.input.get("context", "") 119 | else: 120 | thought_content = str(step_input.input) 121 | thought_number = 1 122 | total_thoughts = 1 123 | context = "" 124 | 125 | # ThoughtData is now imported at module level 126 | 127 | # Extract context preservation fields from input if available 128 | if isinstance(step_input.input, dict): 129 | is_revision = step_input.input.get("isRevision", False) 130 | branch_from_thought = step_input.input.get("branchFromThought") 131 | branch_id = step_input.input.get("branchId") 132 | needs_more_thoughts = step_input.input.get( 133 | "needsMoreThoughts", False 134 | ) 135 | next_thought_needed = step_input.input.get( 136 | "nextThoughtNeeded", True 137 | ) 138 | else: 139 | is_revision = False 140 | branch_from_thought = None 141 | branch_id = None 142 | needs_more_thoughts = False 143 | next_thought_needed = True 144 | 145 | thought_data = ThoughtData( 146 | thought=thought_content, 147 | thoughtNumber=thought_number, 148 | totalThoughts=total_thoughts, 149 | nextThoughtNeeded=next_thought_needed, 150 | isRevision=is_revision, 151 | branchFromThought=branch_from_thought, 152 | branchId=branch_id, 153 | needsMoreThoughts=needs_more_thoughts, 154 | ) 155 | 156 | logger.info(" 📝 Input: %s...", thought_content[:100]) 157 | logger.info(" 🔢 Thought: %s/%s", thought_number, total_thoughts) 158 | 159 | # Process with Multi-Thinking 160 | result = ( 161 | await self.multi_thinking_processor.process_with_multi_thinking( 162 | thought_data, context 163 | ) 164 | ) 165 | 166 | # Store metadata in session_state 167 | session_state["current_strategy"] = result.strategy_used 168 | session_state["current_complexity_score"] = result.complexity_score 169 | session_state["thinking_sequence"] = result.thinking_sequence 170 | session_state["cost_reduction"] = result.cost_reduction 171 | 172 | logger.info(" ✅ Multi-Thinking completed: %s", result.strategy_used) 173 | logger.info(" 📊 Complexity: %.1f", result.complexity_score) 174 | logger.info(" 💰 Cost Reduction: %.1f%%", result.cost_reduction) 175 | 176 | return create_multi_thinking_step_output(result) 177 | 178 | except Exception as e: 179 | logger.exception(" ❌ Multi-Thinking execution failed") 180 | return StepOutput( 181 | content=f"Multi-Thinking processing failed: {e!s}", 182 | success=False, 183 | error=str(e), 184 | step_name="multi_thinking_error", 185 | ) 186 | 187 | return Step( 188 | name="multi_thinking_processing", 189 | executor=multi_thinking_executor, 190 | description="Six Thinking Hats sequential processing", 191 | ) 192 | 193 | async def process_thought_workflow( 194 | self, thought_data: "ThoughtData", context_prompt: str 195 | ) -> MultiThinkingWorkflowResult: 196 | """Process thought using Multi-Thinking workflow.""" 197 | start_time = time.time() 198 | 199 | try: 200 | logger.info("🚀 MULTI-THINKING WORKFLOW INITIALIZATION:") 201 | logger.info( 202 | " 📝 Thought: %s%s", 203 | thought_data.thought[:100], 204 | "..." if len(thought_data.thought) > 100 else "", 205 | ) 206 | logger.info( 207 | " 🔢 Thought Number: %s/%s", 208 | thought_data.thoughtNumber, 209 | thought_data.totalThoughts, 210 | ) 211 | logger.info(" 📋 Context Length: %d chars", len(context_prompt)) 212 | logger.info( 213 | " ⏰ Start Time: %s", 214 | time.strftime("%H:%M:%S", time.localtime(start_time)), 215 | ) 216 | 217 | # Prepare workflow input for Multi-Thinking 218 | workflow_input = { 219 | "thought": thought_data.thought, 220 | "thought_number": thought_data.thoughtNumber, 221 | "total_thoughts": thought_data.totalThoughts, 222 | "context": context_prompt, 223 | } 224 | 225 | logger.info("📦 WORKFLOW INPUT PREPARATION:") 226 | logger.info(" 📊 Input Keys: %s", list(workflow_input.keys())) 227 | logger.info(" 📏 Input Size: %d chars", len(str(workflow_input))) 228 | 229 | # Initialize session_state for metadata tracking 230 | session_state: dict[str, Any] = { 231 | "start_time": start_time, 232 | "thought_number": thought_data.thoughtNumber, 233 | "total_thoughts": thought_data.totalThoughts, 234 | } 235 | 236 | logger.info("🎯 SESSION STATE SETUP:") 237 | logger.info(" 🔑 State Keys: %s", list(session_state.keys())) 238 | logger.info(" 📈 Metadata: %s", session_state) 239 | 240 | logger.info( 241 | "▶️ EXECUTING Multi-Thinking workflow for thought #%s", 242 | thought_data.thoughtNumber, 243 | ) 244 | 245 | # Execute Multi-Thinking workflow 246 | logger.info("🔄 WORKFLOW EXECUTION START...") 247 | result = await self.workflow.arun( 248 | input=workflow_input, session_state=session_state 249 | ) 250 | logger.info("✅ WORKFLOW EXECUTION COMPLETED") 251 | 252 | processing_time = time.time() - start_time 253 | 254 | # Extract clean content from result 255 | content = self._extract_clean_content(result) 256 | 257 | logger.info("📋 CONTENT VALIDATION:") 258 | logger.info( 259 | " ✅ Content extracted successfully: %d characters", len(content) 260 | ) 261 | logger.info( 262 | " 📝 Content preview: %s%s", 263 | content[:150], 264 | "..." if len(content) > 150 else "", 265 | ) 266 | 267 | # Get metadata from session_state 268 | complexity_score = session_state.get("current_complexity_score", 0.0) 269 | strategy_used = session_state.get("current_strategy", "multi_thinking") 270 | thinking_sequence = session_state.get("thinking_sequence", []) 271 | cost_reduction = session_state.get("cost_reduction", 0.0) 272 | 273 | logger.info("📊 WORKFLOW RESULT COMPILATION:") 274 | logger.info(" 🎯 Strategy used: %s", strategy_used) 275 | logger.info(" 🧠 Thinking sequence: %s", " → ".join(thinking_sequence)) 276 | logger.info(" 📈 Complexity score: %.1f", complexity_score) 277 | logger.info(" 💰 Cost reduction: %.1f%%", cost_reduction) 278 | logger.info(" ⏱️ Processing time: %.3fs", processing_time) 279 | 280 | workflow_result = MultiThinkingWorkflowResult( 281 | content=content, 282 | strategy_used=strategy_used, 283 | processing_time=processing_time, 284 | complexity_score=complexity_score, 285 | step_name="multi_thinking_execution", 286 | thinking_sequence=thinking_sequence, 287 | cost_reduction=cost_reduction, 288 | ) 289 | 290 | logger.info("🎉 MULTI-THINKING WORKFLOW COMPLETION:") 291 | logger.info( 292 | " ✅ Completed: strategy=%s, time=%.3fs, score=%.1f, reduction=%.1f%%", 293 | strategy_used, 294 | processing_time, 295 | complexity_score, 296 | cost_reduction, 297 | ) 298 | 299 | return workflow_result 300 | 301 | except Exception as e: 302 | processing_time = time.time() - start_time 303 | logger.exception( 304 | "Multi-Thinking workflow execution failed after %.3fs", processing_time 305 | ) 306 | 307 | return MultiThinkingWorkflowResult( 308 | content=f"Error processing thought with Multi-Thinking: {e!s}", 309 | strategy_used="error_fallback", 310 | processing_time=processing_time, 311 | complexity_score=0.0, 312 | step_name="error_handling", 313 | thinking_sequence=[], 314 | cost_reduction=0.0, 315 | ) 316 | 317 | def _extract_clean_content(self, result: Any) -> str: 318 | """Extract clean content from workflow result.""" 319 | 320 | def extract_recursive(obj: Any, depth: int = 0) -> str: 321 | """Recursively extract clean content from nested objects.""" 322 | if depth > 10: # Prevent infinite recursion 323 | return str(obj) 324 | 325 | # Handle dictionary with common content keys 326 | if isinstance(obj, dict): 327 | for key in [ 328 | "result", 329 | "content", 330 | "message", 331 | "text", 332 | "response", 333 | "output", 334 | "answer", 335 | ]: 336 | if key in obj: 337 | return extract_recursive(obj[key], depth + 1) 338 | # Fallback to any string content 339 | for value in obj.values(): 340 | if isinstance(value, str) and len(value.strip()) > 10: 341 | return value.strip() 342 | return str(obj) 343 | 344 | # Handle objects with content attributes 345 | if hasattr(obj, "content"): 346 | content = obj.content 347 | if isinstance(content, str): 348 | return content.strip() 349 | return extract_recursive(content, depth + 1) 350 | 351 | # Handle other output objects 352 | if hasattr(obj, "output"): 353 | return extract_recursive(obj.output, depth + 1) 354 | 355 | # Handle list/tuple - extract first meaningful content 356 | if isinstance(obj, (list, tuple)) and obj: 357 | for item in obj: 358 | result = extract_recursive(item, depth + 1) 359 | if isinstance(result, str) and len(result.strip()) > 10: 360 | return result.strip() 361 | 362 | # If it's already a string, clean it up 363 | if isinstance(obj, str): 364 | content = obj.strip() 365 | 366 | # Remove object representations 367 | if any( 368 | content.startswith(pattern) 369 | for pattern in [ 370 | "RunOutput(", 371 | "TeamRunOutput(", 372 | "StepOutput(", 373 | "WorkflowResult(", 374 | "{'result':", 375 | '{"result":', 376 | "{'content':", 377 | '{"content":', 378 | ] 379 | ): 380 | # Try to extract content using regex (re imported at module level) 381 | 382 | patterns = [ 383 | (r"content='([^']*)'", 1), 384 | (r'content="([^"]*)"', 1), 385 | (r"'result':\s*'([^']*)'", 1), 386 | (r'"result":\s*"([^"]*)"', 1), 387 | (r"'([^']{20,})'", 1), 388 | (r'"([^"]{20,})"', 1), 389 | ] 390 | 391 | for pattern, group in patterns: 392 | match = re.search(pattern, content) 393 | if match: 394 | extracted = match.group(group).strip() 395 | if len(extracted) > 10: 396 | return extracted 397 | 398 | # Clean up object syntax 399 | cleaned = re.sub(r'[{}()"\']', " ", content) 400 | cleaned = re.sub( 401 | r"\b(RunOutput|TeamRunOutput|StepOutput|content|result|success|error)\b", 402 | " ", 403 | cleaned, 404 | ) 405 | cleaned = re.sub(r"\s+", " ", cleaned).strip() 406 | 407 | if len(cleaned) > 20: 408 | return cleaned 409 | 410 | return content 411 | 412 | # Fallback 413 | result = str(obj).strip() 414 | if len(result) > 20 and not any( 415 | result.startswith(pattern) 416 | for pattern in ["RunOutput(", "TeamRunOutput(", "StepOutput(", "<"] 417 | ): 418 | return result 419 | 420 | return "Multi-Thinking processing completed successfully" 421 | 422 | return extract_recursive(result) 423 | 424 | 425 | # For backward compatibility with the old AgnoWorkflowRouter name 426 | AgnoWorkflowRouter = MultiThinkingWorkflowRouter 427 | WorkflowResult = MultiThinkingWorkflowResult 428 | ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/infrastructure/persistent_memory.py: -------------------------------------------------------------------------------- ```python 1 | """Persistent memory management with SQLAlchemy and memory pruning.""" 2 | 3 | import os 4 | from datetime import datetime, timedelta 5 | from pathlib import Path 6 | 7 | from sqlalchemy import ( 8 | JSON, 9 | Boolean, 10 | Column, 11 | DateTime, 12 | Float, 13 | ForeignKey, 14 | Index, 15 | Integer, 16 | String, 17 | Text, 18 | create_engine, 19 | desc, 20 | ) 21 | from sqlalchemy.orm import DeclarativeBase, Session, relationship, sessionmaker 22 | from sqlalchemy.pool import StaticPool 23 | 24 | from mcp_server_mas_sequential_thinking.config.constants import ( 25 | DatabaseConstants, 26 | DefaultSettings, 27 | ) 28 | from mcp_server_mas_sequential_thinking.core.models import ThoughtData 29 | 30 | from .logging_config import get_logger 31 | 32 | logger = get_logger(__name__) 33 | 34 | 35 | class Base(DeclarativeBase): 36 | """Base class for SQLAlchemy models with modern typing support.""" 37 | 38 | 39 | class SessionRecord(Base): 40 | """Database model for session storage.""" 41 | 42 | __tablename__ = "sessions" 43 | 44 | id = Column(String, primary_key=True) 45 | created_at = Column(DateTime, default=datetime.utcnow) 46 | updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) 47 | total_thoughts = Column(Integer, default=0) 48 | total_cost = Column(Float, default=0.0) 49 | provider = Column(String, default="deepseek") 50 | 51 | # Relationship to thoughts 52 | thoughts = relationship( 53 | "ThoughtRecord", back_populates="session", cascade="all, delete-orphan" 54 | ) 55 | 56 | # Index for performance 57 | __table_args__ = ( 58 | Index("ix_session_created", "created_at"), 59 | Index("ix_session_updated", "updated_at"), 60 | ) 61 | 62 | 63 | class ThoughtRecord(Base): 64 | """Database model for thought storage.""" 65 | 66 | __tablename__ = "thoughts" 67 | 68 | id = Column(Integer, primary_key=True) 69 | session_id = Column(String, ForeignKey("sessions.id"), nullable=False) 70 | thought_number = Column(Integer, nullable=False) 71 | thought = Column(Text, nullable=False) 72 | total_thoughts = Column(Integer, nullable=False) 73 | next_needed = Column(Boolean, nullable=False) 74 | branch_from = Column(Integer, nullable=True) 75 | branch_id = Column(String, nullable=True) 76 | 77 | # Processing metadata 78 | processing_strategy = Column(String, nullable=True) 79 | complexity_score = Column(Float, nullable=True) 80 | estimated_cost = Column(Float, nullable=True) 81 | actual_cost = Column(Float, nullable=True) 82 | token_usage = Column(Integer, nullable=True) 83 | processing_time = Column(Float, nullable=True) 84 | 85 | # Timestamps 86 | created_at = Column(DateTime, default=datetime.utcnow) 87 | processed_at = Column(DateTime, nullable=True) 88 | 89 | # Response data 90 | response = Column(Text, nullable=True) 91 | specialist_used = Column(JSON, nullable=True) # List of specialists used 92 | 93 | # Relationship 94 | session = relationship("SessionRecord", back_populates="thoughts") 95 | 96 | # Indexes for performance 97 | __table_args__ = ( 98 | Index("ix_thought_session", "session_id"), 99 | Index("ix_thought_number", "thought_number"), 100 | Index("ix_thought_created", "created_at"), 101 | Index("ix_thought_branch", "branch_from", "branch_id"), 102 | ) 103 | 104 | def to_thought_data(self) -> ThoughtData: 105 | """Convert database record to ThoughtData model.""" 106 | return ThoughtData( 107 | thought=str(self.thought), 108 | thoughtNumber=int(self.thought_number), 109 | totalThoughts=int(self.total_thoughts), 110 | nextThoughtNeeded=bool(self.next_needed), 111 | isRevision=False, # Assuming default value is intentional 112 | branchFromThought=self.branch_from, 113 | branchId=self.branch_id, 114 | needsMoreThoughts=True, # Assuming default value is intentional 115 | ) 116 | 117 | 118 | class BranchRecord(Base): 119 | """Database model for branch tracking.""" 120 | 121 | __tablename__ = "branches" 122 | 123 | id = Column(Integer, primary_key=True) 124 | session_id = Column(String, ForeignKey("sessions.id"), nullable=False) 125 | branch_id = Column(String, nullable=False) 126 | parent_thought = Column(Integer, nullable=True) 127 | created_at = Column(DateTime, default=datetime.utcnow) 128 | thought_count = Column(Integer, default=0) 129 | 130 | __table_args__ = ( 131 | Index("ix_branch_session", "session_id"), 132 | Index("ix_branch_id", "branch_id"), 133 | ) 134 | 135 | 136 | class UsageMetrics(Base): 137 | """Database model for usage tracking and cost optimization.""" 138 | 139 | __tablename__ = "usage_metrics" 140 | 141 | id = Column(Integer, primary_key=True) 142 | date = Column(DateTime, default=datetime.utcnow) 143 | provider = Column(String, nullable=False) 144 | processing_strategy = Column(String, nullable=False) 145 | complexity_level = Column(String, nullable=False) 146 | 147 | # Metrics 148 | thought_count = Column(Integer, default=0) 149 | total_tokens = Column(Integer, default=0) 150 | total_cost = Column(Float, default=0.0) 151 | avg_processing_time = Column(Float, default=0.0) 152 | success_rate = Column(Float, default=1.0) 153 | 154 | # Performance tracking 155 | token_efficiency = Column(Float, default=0.0) # quality/token ratio 156 | cost_effectiveness = Column(Float, default=0.0) # quality/cost ratio 157 | 158 | __table_args__ = ( 159 | Index("ix_metrics_date", "date"), 160 | Index("ix_metrics_provider", "provider"), 161 | Index("ix_metrics_strategy", "processing_strategy"), 162 | ) 163 | 164 | 165 | class PersistentMemoryManager: 166 | """Manages persistent storage and memory pruning.""" 167 | 168 | def __init__(self, database_url: str | None = None) -> None: 169 | """Initialize persistent memory manager.""" 170 | # Default to local SQLite database 171 | if database_url is None: 172 | db_dir = Path.home() / ".sequential_thinking" / "data" 173 | db_dir.mkdir(parents=True, exist_ok=True) 174 | database_url = f"sqlite:///{db_dir}/memory.db" 175 | 176 | # Configure engine with connection pooling 177 | if database_url.startswith("sqlite"): 178 | # SQLite-specific configuration 179 | self.engine = create_engine( 180 | database_url, 181 | poolclass=StaticPool, 182 | connect_args={"check_same_thread": False}, 183 | echo=False, 184 | ) 185 | else: 186 | # PostgreSQL/other database configuration 187 | self.engine = create_engine( 188 | database_url, 189 | pool_pre_ping=True, 190 | pool_size=DatabaseConstants.CONNECTION_POOL_SIZE, 191 | max_overflow=DatabaseConstants.CONNECTION_POOL_OVERFLOW, 192 | ) 193 | 194 | # Create tables 195 | Base.metadata.create_all(self.engine) 196 | 197 | # Session factory 198 | self.SessionLocal = sessionmaker(bind=self.engine) 199 | 200 | logger.info(f"Persistent memory initialized with database: {database_url}") 201 | 202 | def create_session( 203 | self, session_id: str, provider: str = DefaultSettings.DEFAULT_PROVIDER 204 | ) -> None: 205 | """Create a new session record.""" 206 | with self.SessionLocal() as db: 207 | existing = ( 208 | db.query(SessionRecord).filter(SessionRecord.id == session_id).first() 209 | ) 210 | 211 | if not existing: 212 | session_record = SessionRecord(id=session_id, provider=provider) 213 | db.add(session_record) 214 | db.commit() 215 | logger.info(f"Created new session: {session_id}") 216 | 217 | def store_thought( 218 | self, 219 | session_id: str, 220 | thought_data: ThoughtData, 221 | response: str | None = None, 222 | processing_metadata: dict | None = None, 223 | ) -> int: 224 | """Store a thought and return its database ID.""" 225 | with self.SessionLocal() as db: 226 | session_record = self._ensure_session_exists(db, session_id) 227 | thought_record = self._create_thought_record( 228 | session_id, thought_data, response, processing_metadata 229 | ) 230 | 231 | db.add(thought_record) 232 | self._update_session_stats(session_record, processing_metadata) 233 | self._handle_branching(db, session_id, thought_data) 234 | 235 | db.commit() 236 | return int(thought_record.id) 237 | 238 | def _ensure_session_exists(self, db: Session, session_id: str) -> SessionRecord: 239 | """Ensure session exists in database and return it.""" 240 | session_record = ( 241 | db.query(SessionRecord).filter(SessionRecord.id == session_id).first() 242 | ) 243 | 244 | if not session_record: 245 | self.create_session(session_id) 246 | session_record = ( 247 | db.query(SessionRecord).filter(SessionRecord.id == session_id).first() 248 | ) 249 | 250 | return session_record 251 | 252 | def _create_thought_record( 253 | self, 254 | session_id: str, 255 | thought_data: ThoughtData, 256 | response: str | None, 257 | processing_metadata: dict | None, 258 | ) -> ThoughtRecord: 259 | """Create a thought record with metadata.""" 260 | thought_record = ThoughtRecord( 261 | session_id=session_id, 262 | thought_number=thought_data.thoughtNumber, 263 | thought=thought_data.thought, 264 | total_thoughts=thought_data.totalThoughts, 265 | next_needed=thought_data.nextThoughtNeeded, 266 | branch_from=thought_data.branchFromThought, 267 | branch_id=thought_data.branchId, 268 | response=response, 269 | ) 270 | 271 | if processing_metadata: 272 | self._apply_processing_metadata(thought_record, processing_metadata) 273 | 274 | return thought_record 275 | 276 | def _apply_processing_metadata( 277 | self, thought_record: ThoughtRecord, metadata: dict 278 | ) -> None: 279 | """Apply processing metadata to thought record.""" 280 | if strategy := metadata.get("strategy"): 281 | thought_record.processing_strategy = str(strategy) # type: ignore[assignment] 282 | if complexity_score := metadata.get("complexity_score"): 283 | thought_record.complexity_score = float(complexity_score) # type: ignore[assignment] 284 | if estimated_cost := metadata.get("estimated_cost"): 285 | thought_record.estimated_cost = float(estimated_cost) # type: ignore[assignment] 286 | if actual_cost := metadata.get("actual_cost"): 287 | thought_record.actual_cost = float(actual_cost) # type: ignore[assignment] 288 | if token_usage := metadata.get("token_usage"): 289 | thought_record.token_usage = int(token_usage) # type: ignore[assignment] 290 | if processing_time := metadata.get("processing_time"): 291 | thought_record.processing_time = float(processing_time) # type: ignore[assignment] 292 | 293 | thought_record.specialist_used = metadata.get("specialists", []) 294 | thought_record.processed_at = datetime.utcnow() # type: ignore[assignment] 295 | 296 | def _update_session_stats( 297 | self, session_record: SessionRecord, processing_metadata: dict | None 298 | ) -> None: 299 | """Update session statistics.""" 300 | if session_record: 301 | session_record.totalThoughts += 1 # type: ignore[assignment] 302 | session_record.updated_at = datetime.utcnow() # type: ignore[assignment] 303 | if processing_metadata and processing_metadata.get("actual_cost"): 304 | session_record.total_cost += float(processing_metadata["actual_cost"]) # type: ignore[assignment] 305 | 306 | def _handle_branching( 307 | self, db: Session, session_id: str, thought_data: ThoughtData 308 | ) -> None: 309 | """Handle branch record creation and updates.""" 310 | if not thought_data.branchId: 311 | return 312 | 313 | branch_record = ( 314 | db.query(BranchRecord) 315 | .filter( 316 | BranchRecord.session_id == session_id, 317 | BranchRecord.branchId == thought_data.branchId, 318 | ) 319 | .first() 320 | ) 321 | 322 | if not branch_record: 323 | branch_record = BranchRecord( 324 | session_id=session_id, 325 | branch_id=thought_data.branchId, 326 | parent_thought=thought_data.branchFromThought, 327 | ) 328 | db.add(branch_record) 329 | 330 | branch_record.thought_count += 1 # type: ignore[assignment] 331 | 332 | def get_session_thoughts( 333 | self, session_id: str, limit: int | None = None 334 | ) -> list[ThoughtRecord]: 335 | """Retrieve thoughts for a session.""" 336 | with self.SessionLocal() as db: 337 | query = ( 338 | db.query(ThoughtRecord) 339 | .filter(ThoughtRecord.session_id == session_id) 340 | .order_by(ThoughtRecord.thoughtNumber) 341 | ) 342 | 343 | if limit: 344 | query = query.limit(limit) 345 | 346 | return query.all() 347 | 348 | def get_thought_by_number( 349 | self, session_id: str, thought_number: int 350 | ) -> ThoughtRecord | None: 351 | """Get a specific thought by number.""" 352 | with self.SessionLocal() as db: 353 | return ( 354 | db.query(ThoughtRecord) 355 | .filter( 356 | ThoughtRecord.session_id == session_id, 357 | ThoughtRecord.thoughtNumber == thought_number, 358 | ) 359 | .first() 360 | ) 361 | 362 | def get_branch_thoughts( 363 | self, session_id: str, branch_id: str 364 | ) -> list[ThoughtRecord]: 365 | """Get all thoughts in a specific branch.""" 366 | with self.SessionLocal() as db: 367 | return ( 368 | db.query(ThoughtRecord) 369 | .filter( 370 | ThoughtRecord.session_id == session_id, 371 | ThoughtRecord.branchId == branch_id, 372 | ) 373 | .order_by(ThoughtRecord.thoughtNumber) 374 | .all() 375 | ) 376 | 377 | def prune_old_sessions( 378 | self, older_than_days: int = 30, keep_recent: int = 100 379 | ) -> int: 380 | """Prune old sessions to manage storage space.""" 381 | cutoff_date = datetime.utcnow() - timedelta(days=older_than_days) 382 | 383 | with self.SessionLocal() as db: 384 | # Get sessions older than cutoff, excluding most recent ones 385 | old_sessions = ( 386 | db.query(SessionRecord) 387 | .filter(SessionRecord.updated_at < cutoff_date) 388 | .order_by(desc(SessionRecord.updated_at)) 389 | .offset(keep_recent) 390 | ) 391 | 392 | deleted_count = 0 393 | for session in old_sessions: 394 | db.delete(session) # Cascade will handle thoughts and branches 395 | deleted_count += 1 396 | 397 | if deleted_count > 0: 398 | db.commit() 399 | logger.info(f"Pruned {deleted_count} old sessions") 400 | 401 | return deleted_count 402 | 403 | def get_usage_stats(self, days_back: int = 7) -> dict: 404 | """Get usage statistics for cost optimization.""" 405 | cutoff_date = datetime.utcnow() - timedelta(days=days_back) 406 | 407 | with self.SessionLocal() as db: 408 | # Session stats 409 | session_count = ( 410 | db.query(SessionRecord) 411 | .filter(SessionRecord.created_at >= cutoff_date) 412 | .count() 413 | ) 414 | 415 | # Thought stats 416 | thought_stats = ( 417 | db.query(ThoughtRecord) 418 | .filter(ThoughtRecord.created_at >= cutoff_date) 419 | .all() 420 | ) 421 | 422 | total_thoughts = len(thought_stats) 423 | # Explicit type casting to resolve SQLAlchemy Column type issues 424 | total_cost: float = float( 425 | sum(float(t.actual_cost or 0) for t in thought_stats) 426 | ) 427 | total_tokens: int = int(sum(int(t.token_usage or 0) for t in thought_stats)) 428 | avg_processing_time = sum( 429 | t.processing_time or 0 for t in thought_stats 430 | ) / max(total_thoughts, 1) 431 | 432 | # Strategy breakdown 433 | strategy_stats = {} 434 | for thought in thought_stats: 435 | strategy = thought.processing_strategy or "unknown" 436 | if strategy not in strategy_stats: 437 | strategy_stats[strategy] = {"count": 0, "cost": 0, "tokens": 0} 438 | strategy_stats[strategy]["count"] += 1 439 | strategy_stats[strategy]["cost"] += thought.actual_cost or 0 440 | strategy_stats[strategy]["tokens"] += thought.token_usage or 0 441 | 442 | return { 443 | "period_days": days_back, 444 | "session_count": session_count, 445 | "total_thoughts": total_thoughts, 446 | "total_cost": total_cost, 447 | "total_tokens": total_tokens, 448 | "avg_cost_per_thought": total_cost / max(total_thoughts, 1), 449 | "avg_tokens_per_thought": total_tokens / max(total_thoughts, 1), 450 | "avg_processing_time": avg_processing_time, 451 | "strategy_breakdown": strategy_stats, 452 | } 453 | 454 | def record_usage_metrics( 455 | self, 456 | provider: str, 457 | processing_strategy: str, 458 | complexity_level: str, 459 | thought_count: int = 1, 460 | tokens: int = 0, 461 | cost: float = 0.0, 462 | processing_time: float = 0.0, 463 | success: bool = True, 464 | ) -> None: 465 | """Record usage metrics for cost optimization.""" 466 | with self.SessionLocal() as db: 467 | # Check if we have a record for today 468 | today = datetime.utcnow().date() 469 | existing = ( 470 | db.query(UsageMetrics) 471 | .filter( 472 | UsageMetrics.date >= datetime.combine(today, datetime.min.time()), 473 | UsageMetrics.provider == provider, 474 | UsageMetrics.processing_strategy == processing_strategy, 475 | UsageMetrics.complexity_level == complexity_level, 476 | ) 477 | .first() 478 | ) 479 | 480 | if existing: 481 | # Update existing record 482 | existing.thought_count += thought_count # type: ignore[assignment] 483 | existing.total_tokens += tokens # type: ignore[assignment] 484 | existing.total_cost += cost # type: ignore[assignment] 485 | existing.avg_processing_time = ( # type: ignore[assignment] 486 | float(existing.avg_processing_time) 487 | * (existing.thought_count - thought_count) 488 | + processing_time * thought_count 489 | ) / existing.thought_count 490 | if not success: 491 | existing.success_rate = ( # type: ignore[assignment] 492 | float(existing.success_rate) 493 | * (existing.thought_count - thought_count) 494 | ) / existing.thought_count 495 | else: 496 | # Create new record 497 | metrics = UsageMetrics( 498 | provider=provider, 499 | processing_strategy=processing_strategy, 500 | complexity_level=complexity_level, 501 | thought_count=thought_count, 502 | total_tokens=tokens, 503 | total_cost=cost, 504 | avg_processing_time=processing_time, 505 | success_rate=1.0 if success else 0.0, 506 | ) 507 | db.add(metrics) 508 | 509 | db.commit() 510 | 511 | def optimize_database(self) -> None: 512 | """Run database optimization tasks.""" 513 | with self.SessionLocal() as db: 514 | from sqlalchemy import text 515 | 516 | if self.engine.dialect.name == "sqlite": 517 | db.execute(text("VACUUM")) 518 | db.execute(text("ANALYZE")) 519 | else: 520 | db.execute(text("ANALYZE")) 521 | db.commit() 522 | logger.info("Database optimization completed") 523 | 524 | def close(self) -> None: 525 | """Close database connections.""" 526 | self.engine.dispose() 527 | 528 | 529 | # Convenience functions 530 | def create_persistent_memory( 531 | database_url: str | None = None, 532 | ) -> PersistentMemoryManager: 533 | """Create a persistent memory manager instance.""" 534 | return PersistentMemoryManager(database_url) 535 | 536 | 537 | def get_database_url_from_env() -> str: 538 | """Get database URL from environment variables.""" 539 | if url := os.getenv("DATABASE_URL"): 540 | return url 541 | 542 | # Default to local SQLite 543 | db_dir = Path.home() / ".sequential_thinking" / "data" 544 | db_dir.mkdir(parents=True, exist_ok=True) 545 | return f"sqlite:///{db_dir}/memory.db" 546 | ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/processors/multi_thinking_core.py: -------------------------------------------------------------------------------- ```python 1 | """Multi-Thinking Core Agent Architecture. 2 | 3 | Multi-dimensional thinking direction Agent core implementation. 4 | Strictly follows "focused thinking direction" principles, supporting intelligent sequences from single to multiple thinking modes. 5 | """ 6 | 7 | # Lazy import to break circular dependency 8 | import logging 9 | import os 10 | from abc import abstractmethod 11 | from dataclasses import dataclass 12 | from enum import Enum 13 | from typing import Any 14 | 15 | from agno.agent import Agent 16 | from agno.models.base import Model 17 | from agno.tools.reasoning import ReasoningTools 18 | 19 | # Try to import ExaTools, gracefully handle if not available 20 | try: 21 | from agno.tools.exa import ExaTools 22 | 23 | EXA_AVAILABLE = bool(os.environ.get("EXA_API_KEY")) 24 | except ImportError: 25 | ExaTools = None 26 | EXA_AVAILABLE = False 27 | 28 | logger = logging.getLogger(__name__) 29 | 30 | 31 | class ThinkingDirection(Enum): 32 | """Multi-thinking direction enumeration.""" 33 | 34 | FACTUAL = "factual" # Facts and data 35 | EMOTIONAL = "emotional" # Emotions and intuition 36 | CRITICAL = "critical" # Criticism and risk 37 | OPTIMISTIC = "optimistic" # Optimism and value 38 | CREATIVE = "creative" # Creativity and innovation 39 | SYNTHESIS = "synthesis" # Metacognition and integration 40 | 41 | 42 | class ProcessingDepth(Enum): 43 | """Processing complexity levels.""" 44 | 45 | SINGLE = "single" # Single thinking mode 46 | DOUBLE = "double" # Double thinking sequence 47 | TRIPLE = "triple" # Triple thinking sequence 48 | FULL = "full" # Complete multi-thinking 49 | 50 | 51 | @dataclass(frozen=True) 52 | class ThinkingTimingConfig: 53 | """Thinking direction timing configuration.""" 54 | 55 | direction: ThinkingDirection 56 | default_time_seconds: int 57 | min_time_seconds: int 58 | max_time_seconds: int 59 | is_quick_reaction: bool = ( 60 | False # Whether it's a quick reaction mode (like emotional thinking) 61 | ) 62 | 63 | 64 | # Timing configuration constants 65 | THINKING_TIMING_CONFIGS = { 66 | ThinkingDirection.FACTUAL: ThinkingTimingConfig( 67 | ThinkingDirection.FACTUAL, 120, 60, 300, False 68 | ), 69 | ThinkingDirection.EMOTIONAL: ThinkingTimingConfig( 70 | ThinkingDirection.EMOTIONAL, 30, 15, 60, True 71 | ), # Quick intuition 72 | ThinkingDirection.CRITICAL: ThinkingTimingConfig( 73 | ThinkingDirection.CRITICAL, 120, 60, 240, False 74 | ), 75 | ThinkingDirection.OPTIMISTIC: ThinkingTimingConfig( 76 | ThinkingDirection.OPTIMISTIC, 120, 60, 240, False 77 | ), 78 | ThinkingDirection.CREATIVE: ThinkingTimingConfig( 79 | ThinkingDirection.CREATIVE, 240, 120, 360, False 80 | ), # Creativity requires more time 81 | ThinkingDirection.SYNTHESIS: ThinkingTimingConfig( 82 | ThinkingDirection.SYNTHESIS, 60, 30, 120, False 83 | ), 84 | } 85 | 86 | 87 | @dataclass(frozen=True) 88 | class ThinkingCapability: 89 | """Multi-thinking capability definition.""" 90 | 91 | thinking_direction: ThinkingDirection 92 | role: str 93 | description: str 94 | role_description: str 95 | 96 | # Cognitive characteristics 97 | thinking_mode: str 98 | cognitive_focus: str 99 | output_style: str 100 | 101 | # Time management 102 | timing_config: ThinkingTimingConfig 103 | 104 | # Enhanced features 105 | tools: list[Any] | None = None 106 | reasoning_level: int = 1 107 | memory_enabled: bool = False 108 | 109 | def __post_init__(self): 110 | if self.tools is None: 111 | # Default tools for all thinking directions 112 | tools = [ReasoningTools] 113 | 114 | # Add ExaTools for all thinking directions except SYNTHESIS 115 | if ( 116 | EXA_AVAILABLE 117 | and ExaTools is not None 118 | and self.thinking_direction != ThinkingDirection.SYNTHESIS 119 | ): 120 | tools.append(ExaTools) 121 | 122 | object.__setattr__(self, "tools", tools) 123 | 124 | def get_instructions( 125 | self, context: str = "", previous_results: dict | None = None 126 | ) -> list[str]: 127 | """Generate specific analysis mode instructions.""" 128 | base_instructions = [ 129 | f"You are operating in {self.thinking_mode} analysis mode.", 130 | f"Role: {self.role}", 131 | f"Cognitive Focus: {self.cognitive_focus}", 132 | "", 133 | "CORE PRINCIPLES:", 134 | f"1. Apply ONLY {self.thinking_mode} approach - maintain strict focus", 135 | f"2. Time allocation: {self.timing_config.default_time_seconds} seconds for thorough analysis", 136 | f"3. Output approach: {self.output_style}", 137 | "", 138 | f"Your specific responsibility: {self.role_description}", 139 | ] 140 | 141 | # Add specific analysis mode detailed instructions 142 | specific_instructions = self._get_specific_instructions() 143 | base_instructions.extend(specific_instructions) 144 | 145 | # Add context and previous results 146 | if context: 147 | base_instructions.extend( 148 | [ 149 | "", 150 | f"Context: {context}", 151 | ] 152 | ) 153 | 154 | if previous_results: 155 | base_instructions.extend( 156 | [ 157 | "", 158 | "Previous analysis insights from other perspectives:", 159 | *[ 160 | f" {self._format_previous_result_label(direction_name)}: {result[:100]}..." 161 | for direction_name, result in previous_results.items() 162 | ], 163 | ] 164 | ) 165 | 166 | return base_instructions 167 | 168 | def _format_previous_result_label(self, direction_name: str) -> str: 169 | """Format previous result labels, using thinking direction concepts.""" 170 | label_mapping = { 171 | "factual": "Factual analysis", 172 | "emotional": "Emotional perspective", 173 | "critical": "Critical evaluation", 174 | "optimistic": "Optimistic view", 175 | "creative": "Creative thinking", 176 | "synthesis": "Strategic synthesis", 177 | } 178 | return label_mapping.get(direction_name.lower(), "Analysis") 179 | 180 | @abstractmethod 181 | def _get_specific_instructions(self) -> list[str]: 182 | """Get specific thinking direction detailed instructions.""" 183 | 184 | 185 | class FactualThinkingCapability(ThinkingCapability): 186 | """Factual thinking capability: facts and data.""" 187 | 188 | def __init__(self) -> None: 189 | super().__init__( 190 | thinking_direction=ThinkingDirection.FACTUAL, 191 | role="Factual Information Processor", 192 | description="Collect and process objective facts and data", 193 | role_description="I focus only on objective facts and data. I provide neutral information without personal interpretation.", 194 | thinking_mode="analytical_factual", 195 | cognitive_focus="Pure information processing, zero emotion or judgment", 196 | output_style="Objective fact lists, data-driven information", 197 | timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.FACTUAL], 198 | reasoning_level=2, 199 | memory_enabled=False, 200 | ) 201 | 202 | def _get_specific_instructions(self) -> list[str]: 203 | instructions = [ 204 | "", 205 | "FACTUAL ANALYSIS GUIDELINES:", 206 | "• Use simple statements to present facts: 'The data shows...', 'Known information is...'", 207 | "• Avoid technical jargon, explain data in everyday language", 208 | "• Present only verified facts and objective data", 209 | "• Avoid opinions, interpretations, or emotional reactions", 210 | "• Identify what information is missing and needed", 211 | "• Separate facts from assumptions clearly", 212 | ] 213 | 214 | # Add research capabilities if ExaTools is available 215 | if EXA_AVAILABLE and ExaTools is not None: 216 | instructions.extend( 217 | [ 218 | "", 219 | "RESEARCH CAPABILITIES:", 220 | "• Use search_exa() to find current facts and data when needed", 221 | "• Search for recent information, statistics, or verified data", 222 | "• Cite sources when presenting factual information", 223 | "• Prioritize authoritative sources and recent data", 224 | ] 225 | ) 226 | 227 | instructions.extend( 228 | [ 229 | "", 230 | "FORBIDDEN in factual analysis mode:", 231 | "- Personal opinions or judgments", 232 | "- Emotional responses or gut feelings", 233 | "- Speculation or 'what if' scenarios", 234 | "- Value judgments (good/bad, right/wrong)", 235 | ] 236 | ) 237 | 238 | return instructions 239 | 240 | 241 | class EmotionalThinkingCapability(ThinkingCapability): 242 | """Emotional thinking capability: emotions and intuition.""" 243 | 244 | def __init__(self) -> None: 245 | super().__init__( 246 | thinking_direction=ThinkingDirection.EMOTIONAL, 247 | role="Intuitive Emotional Processor", 248 | description="Provide emotional responses and intuitive insights", 249 | role_description="I express intuition and emotional reactions. No reasoning needed, just share feelings.", 250 | thinking_mode="intuitive_emotional", 251 | cognitive_focus="Emotional intelligence and intuitive processing", 252 | output_style="Intuitive reactions, emotional expression, humanized perspective", 253 | timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.EMOTIONAL], 254 | reasoning_level=1, # Lowest rationality, highest intuition 255 | memory_enabled=False, 256 | ) 257 | 258 | def _get_specific_instructions(self) -> list[str]: 259 | return [ 260 | "", 261 | "EMOTIONAL INTUITIVE GUIDELINES:", 262 | "• Start responses with 'I feel...', 'My intuition tells me...', 'My gut reaction is...'", 263 | "• Keep expressions brief and powerful - 30-second emotional snapshots", 264 | "• Express immediate gut reactions and feelings", 265 | "• Share intuitive hunches without justification", 266 | "• Include visceral, immediate responses", 267 | "• NO need to explain or justify feelings", 268 | "", 269 | "ENCOURAGED in emotional intuitive mode:", 270 | "- First impressions and gut reactions", 271 | "- Emotional responses to ideas or situations", 272 | "- Intuitive concerns or excitement", 273 | "- 'Sixth sense' about what might work", 274 | "", 275 | "Remember: This is a 30-second emotional snapshot, not analysis!", 276 | ] 277 | 278 | 279 | class CriticalThinkingCapability(ThinkingCapability): 280 | """Critical thinking capability: criticism and risk.""" 281 | 282 | def __init__(self) -> None: 283 | super().__init__( 284 | thinking_direction=ThinkingDirection.CRITICAL, 285 | role="Critical Risk Assessor", 286 | description="Critical analysis and risk identification", 287 | role_description="I identify risks and problems. Critical but not pessimistic, I point out real difficulties.", 288 | thinking_mode="critical_analytical", 289 | cognitive_focus="Critical thinking and risk assessment", 290 | output_style="Sharp questioning, risk warnings, logical verification", 291 | timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.CRITICAL], 292 | reasoning_level=3, # High logical reasoning 293 | memory_enabled=False, 294 | ) 295 | 296 | def _get_specific_instructions(self) -> list[str]: 297 | instructions = [ 298 | "", 299 | "CRITICAL ASSESSMENT GUIDELINES:", 300 | "• Point out specific possible problems, not general pessimism", 301 | "• Use phrases like 'The risk is...', 'This could fail because...', 'A problem might be...'", 302 | "• Identify potential problems, risks, and weaknesses", 303 | "• Challenge assumptions and look for logical flaws", 304 | "• Consider worst-case scenarios and failure modes", 305 | "• Provide logical reasons for all concerns raised", 306 | ] 307 | 308 | # Add research capabilities if ExaTools is available 309 | if EXA_AVAILABLE and ExaTools is not None: 310 | instructions.extend( 311 | [ 312 | "", 313 | "RESEARCH FOR CRITICAL ANALYSIS:", 314 | "• Search for counterexamples, failed cases, or criticism of similar ideas", 315 | "• Look for expert opinions that identify risks or problems", 316 | "• Find case studies of failures in similar contexts", 317 | "• Research regulatory or compliance issues that might apply", 318 | ] 319 | ) 320 | 321 | instructions.extend( 322 | [ 323 | "", 324 | "KEY AREAS TO EXAMINE:", 325 | "- Logical inconsistencies in arguments", 326 | "- Practical obstacles and implementation challenges", 327 | "- Resource constraints and limitations", 328 | "- Potential negative consequences", 329 | "- Missing information or unproven assumptions", 330 | "", 331 | "Note: Be critical but constructive - identify real problems, not just pessimism.", 332 | ] 333 | ) 334 | 335 | return instructions 336 | 337 | 338 | class OptimisticThinkingCapability(ThinkingCapability): 339 | """Optimistic thinking capability: optimism and value.""" 340 | 341 | def __init__(self) -> None: 342 | super().__init__( 343 | thinking_direction=ThinkingDirection.OPTIMISTIC, 344 | role="Optimistic Value Explorer", 345 | description="Positive thinking and value discovery", 346 | role_description="I find value and opportunities. Realistic optimism, I discover genuine benefits.", 347 | thinking_mode="optimistic_constructive", 348 | cognitive_focus="Positive psychology and opportunity identification", 349 | output_style="Positive exploration, value discovery, opportunity identification", 350 | timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.OPTIMISTIC], 351 | reasoning_level=2, 352 | memory_enabled=False, 353 | ) 354 | 355 | def _get_specific_instructions(self) -> list[str]: 356 | instructions = [ 357 | "", 358 | "OPTIMISTIC VALUE EXPLORATION GUIDELINES:", 359 | "• Point out specific feasible benefits, not empty praise", 360 | "• Use phrases like 'The benefit is...', 'This creates... value', 'An opportunity here is...'", 361 | "• Focus on benefits, values, and positive outcomes", 362 | "• Explore best-case scenarios and opportunities", 363 | "• Identify feasible positive possibilities", 364 | "• Provide logical reasons for optimism", 365 | ] 366 | 367 | # Add research capabilities if ExaTools is available 368 | if EXA_AVAILABLE and ExaTools is not None: 369 | instructions.extend( 370 | [ 371 | "", 372 | "RESEARCH FOR OPTIMISTIC ANALYSIS:", 373 | "• Search for success stories and positive case studies", 374 | "• Look for evidence of benefits in similar situations", 375 | "• Find research supporting potential positive outcomes", 376 | "• Research market opportunities and growth potential", 377 | ] 378 | ) 379 | 380 | instructions.extend( 381 | [ 382 | "", 383 | "KEY AREAS TO EXPLORE:", 384 | "- Benefits and positive outcomes", 385 | "- Opportunities for growth or improvement", 386 | "- Feasible best-case scenarios", 387 | "- Value creation possibilities", 388 | "- Strengths and positive aspects", 389 | "- Why this could work well", 390 | "", 391 | "Note: Be realistically optimistic - find genuine value, not false hope.", 392 | ] 393 | ) 394 | 395 | return instructions 396 | 397 | 398 | class CreativeThinkingCapability(ThinkingCapability): 399 | """Creative thinking capability: creativity and innovation.""" 400 | 401 | def __init__(self) -> None: 402 | super().__init__( 403 | thinking_direction=ThinkingDirection.CREATIVE, 404 | role="Creative Innovation Generator", 405 | description="Creative thinking and innovative solutions", 406 | role_description="I generate new ideas and alternative approaches. I break conventional limits and explore possibilities.", 407 | thinking_mode="creative_generative", 408 | cognitive_focus="Divergent thinking and creativity", 409 | output_style="Novel ideas, innovative solutions, alternative thinking", 410 | timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.CREATIVE], 411 | reasoning_level=2, 412 | memory_enabled=True, # Creativity may require memory combination 413 | ) 414 | 415 | def _get_specific_instructions(self) -> list[str]: 416 | instructions = [ 417 | "", 418 | "CREATIVE INNOVATION GUIDELINES:", 419 | "• Provide 3-5 specific creative ideas that could work", 420 | "• Use phrases like 'What if...', 'Another approach could be...', 'An alternative is...'", 421 | "• Generate new ideas, alternatives, and creative solutions", 422 | "• Think laterally - explore unconventional approaches", 423 | "• Break normal thinking patterns and assumptions", 424 | "• Suggest modifications, improvements, or entirely new approaches", 425 | ] 426 | 427 | # Add research capabilities if ExaTools is available 428 | if EXA_AVAILABLE and ExaTools is not None: 429 | instructions.extend( 430 | [ 431 | "", 432 | "RESEARCH FOR CREATIVE INSPIRATION:", 433 | "• Search for innovative solutions in different industries", 434 | "• Look for creative approaches used in unrelated fields", 435 | "• Find emerging trends and new methodologies", 436 | "• Research breakthrough innovations and creative disruptions", 437 | ] 438 | ) 439 | 440 | instructions.extend( 441 | [ 442 | "", 443 | "CREATIVE TECHNIQUES TO USE:", 444 | "- Lateral thinking and analogies", 445 | "- Random word associations", 446 | "- 'What if' scenarios and thought experiments", 447 | "- Reversal thinking (what's the opposite?)", 448 | "- Combination of unrelated elements", 449 | "- Alternative perspectives and viewpoints", 450 | "", 451 | "Note: Quantity over quality - generate many ideas without judgment.", 452 | ] 453 | ) 454 | 455 | return instructions 456 | 457 | 458 | class SynthesisThinkingCapability(ThinkingCapability): 459 | """Synthesis thinking capability: metacognition and integration.""" 460 | 461 | def __init__(self) -> None: 462 | super().__init__( 463 | thinking_direction=ThinkingDirection.SYNTHESIS, 464 | role="Metacognitive Orchestrator", 465 | description="Thinking process management and comprehensive coordination", 466 | role_description="I integrate all perspectives and provide the final balanced answer. My output is what users see - it must be practical and human-friendly.", 467 | thinking_mode="metacognitive_synthetic", 468 | cognitive_focus="Metacognition and executive control", 469 | output_style="Comprehensive integration, process management, unified conclusions", 470 | timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.SYNTHESIS], 471 | reasoning_level=3, # Highest level of metacognition 472 | memory_enabled=True, # Need to remember all other thinking direction results 473 | ) 474 | 475 | def _get_specific_instructions(self) -> list[str]: 476 | return [ 477 | "", 478 | "STRATEGIC SYNTHESIS GUIDELINES:", 479 | "• Your primary goal: Answer the original question using insights from other analyses", 480 | "• Avoid generic rehashing - focus specifically on the question asked", 481 | "• Use other analyses' contributions as evidence/perspectives to build your answer", 482 | "• Provide practical, actionable insights users can understand", 483 | "", 484 | "CRITICAL QUESTION-FOCUSED APPROACH:", 485 | "1. Extract insights from other analyses that directly address the question", 486 | "2. Ignore generic statements - focus on question-relevant content", 487 | "3. Build a coherent answer that uses multiple perspectives as support", 488 | "4. End with a clear, direct response to what was originally asked", 489 | "", 490 | "KEY RESPONSIBILITIES:", 491 | "- Return to the original question and answer it directly", 492 | "- Use other analyses' insights as building blocks for your answer", 493 | "- Synthesize perspectives into a cohesive response to the specific question", 494 | "- Avoid academic summarization - focus on practical question-answering", 495 | "- Ensure your entire response serves the original question", 496 | "", 497 | "FINAL OUTPUT REQUIREMENTS:", 498 | "• This is the user's ONLY answer - it must directly address their question", 499 | "• Don't just summarize - synthesize into a clear answer", 500 | "• Remove content that doesn't directly relate to the original question", 501 | "• For philosophical questions: provide thoughtful answers, not just analysis", 502 | "• You may mention different analysis types, perspectives, or methodologies if relevant", 503 | "• Present your synthesis process transparently, showing how different viewpoints contribute", 504 | ] 505 | 506 | 507 | class MultiThinkingAgentFactory: 508 | """Multi-thinking Agent factory.""" 509 | 510 | # Thinking direction capability mapping 511 | THINKING_CAPABILITIES = { 512 | ThinkingDirection.FACTUAL: FactualThinkingCapability(), 513 | ThinkingDirection.EMOTIONAL: EmotionalThinkingCapability(), 514 | ThinkingDirection.CRITICAL: CriticalThinkingCapability(), 515 | ThinkingDirection.OPTIMISTIC: OptimisticThinkingCapability(), 516 | ThinkingDirection.CREATIVE: CreativeThinkingCapability(), 517 | ThinkingDirection.SYNTHESIS: SynthesisThinkingCapability(), 518 | } 519 | 520 | def __init__(self) -> None: 521 | self._agent_cache: dict[str, Agent] = {} # Cache for created agents 522 | 523 | def create_thinking_agent( 524 | self, 525 | thinking_direction: ThinkingDirection, 526 | model: Model, 527 | context: str = "", 528 | previous_results: dict | None = None, 529 | **kwargs, 530 | ) -> Agent: 531 | """Create a specific thinking direction Agent.""" 532 | capability = self.THINKING_CAPABILITIES[thinking_direction] 533 | 534 | # Generate cache key 535 | cache_key = ( 536 | f"{thinking_direction.value}_{model.__class__.__name__}_{hash(context)}" 537 | ) 538 | 539 | if cache_key in self._agent_cache: 540 | # Return cached agent but update instructions 541 | agent = self._agent_cache[cache_key] 542 | agent.instructions = capability.get_instructions(context, previous_results) 543 | return agent 544 | 545 | # Create new agent 546 | agent = Agent( 547 | name=f"{thinking_direction.value.title()}AnalysisAgent", 548 | role=capability.role, 549 | description=capability.description, 550 | model=model, 551 | tools=capability.tools if capability.tools else None, 552 | instructions=capability.get_instructions(context, previous_results), 553 | markdown=True, 554 | **kwargs, 555 | ) 556 | 557 | # Add special configuration 558 | if capability.memory_enabled: 559 | agent.enable_user_memories = True 560 | 561 | # Cache agent 562 | self._agent_cache[cache_key] = agent 563 | 564 | logger.info( 565 | f"Created {thinking_direction.value} thinking agent with {capability.timing_config.default_time_seconds}s time limit" 566 | ) 567 | return agent 568 | 569 | def get_thinking_timing( 570 | self, thinking_direction: ThinkingDirection 571 | ) -> ThinkingTimingConfig: 572 | """Get specific thinking direction timing configuration.""" 573 | return THINKING_TIMING_CONFIGS[thinking_direction] 574 | 575 | def get_all_thinking_directions(self) -> list[ThinkingDirection]: 576 | """Get all available thinking directions.""" 577 | return list(self.THINKING_CAPABILITIES.keys()) 578 | 579 | def clear_cache(self) -> None: 580 | """Clear agent cache.""" 581 | self._agent_cache.clear() 582 | logger.info("Thinking agent cache cleared") 583 | 584 | 585 | # Global factory instance 586 | _thinking_factory = MultiThinkingAgentFactory() 587 | 588 | 589 | # Convenience functions 590 | def create_thinking_agent( 591 | thinking_direction: ThinkingDirection, model: Model, **kwargs 592 | ) -> Agent: 593 | """Convenience function to create thinking Agent.""" 594 | return _thinking_factory.create_thinking_agent(thinking_direction, model, **kwargs) 595 | 596 | 597 | def get_thinking_timing(thinking_direction: ThinkingDirection) -> ThinkingTimingConfig: 598 | """Convenience function to get thinking timing configuration.""" 599 | return _thinking_factory.get_thinking_timing(thinking_direction) 600 | 601 | 602 | def get_all_thinking_directions() -> list[ThinkingDirection]: 603 | """Convenience function to get all thinking directions.""" 604 | return _thinking_factory.get_all_thinking_directions() 605 | ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/processors/multi_thinking_processor.py: -------------------------------------------------------------------------------- ```python 1 | """Multi-Thinking Sequential Processor. 2 | 3 | Implements a parallel processor based on multi-directional thinking methodology, 4 | integrated with the Agno Workflow system. Supports intelligent parallel processing 5 | from single direction to full multi-direction analysis. 6 | """ 7 | 8 | # Lazy import to break circular dependency 9 | import logging 10 | import time 11 | from dataclasses import dataclass 12 | from typing import TYPE_CHECKING, Any 13 | 14 | from agno.workflow.types import StepOutput 15 | 16 | if TYPE_CHECKING: 17 | from mcp_server_mas_sequential_thinking.core.models import ThoughtData 18 | 19 | logger = logging.getLogger(__name__) 20 | from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config 21 | from mcp_server_mas_sequential_thinking.routing.multi_thinking_router import ( 22 | MultiThinkingIntelligentRouter, 23 | RoutingDecision, 24 | ) 25 | 26 | from .multi_thinking_core import ( 27 | MultiThinkingAgentFactory, 28 | ProcessingDepth, 29 | ThinkingDirection, 30 | ) 31 | 32 | # logger already defined above 33 | 34 | 35 | @dataclass 36 | class MultiThinkingProcessingResult: 37 | """Multi-thinking processing result.""" 38 | 39 | content: str 40 | strategy_used: str 41 | thinking_sequence: list[str] 42 | processing_time: float 43 | complexity_score: float 44 | cost_reduction: float 45 | individual_results: dict[str, str] # Results from each thinking direction 46 | step_name: str 47 | 48 | 49 | class MultiThinkingSequentialProcessor: 50 | """Multi-thinking parallel processor.""" 51 | 52 | def __init__(self) -> None: 53 | self.model_config = get_model_config() 54 | self.thinking_factory = MultiThinkingAgentFactory() 55 | self.router = MultiThinkingIntelligentRouter() 56 | 57 | async def process_with_multi_thinking( 58 | self, thought_data: "ThoughtData", context_prompt: str = "" 59 | ) -> MultiThinkingProcessingResult: 60 | """Process thoughts using multi-thinking methodology with parallel execution.""" 61 | start_time = time.time() 62 | 63 | logger.info("Multi-thinking processing started") 64 | if logger.isEnabledFor(logging.INFO): 65 | logger.info("Input preview: %s", thought_data.thought[:100]) 66 | logger.info("Context length: %d chars", len(context_prompt)) 67 | 68 | try: 69 | # Step 1: Intelligent routing decision 70 | routing_decision = await self.router.route_thought(thought_data) 71 | 72 | logger.info("Selected strategy: %s", routing_decision.strategy.name) 73 | if logger.isEnabledFor(logging.INFO): 74 | sequence = [ 75 | direction.value 76 | for direction in routing_decision.strategy.thinking_sequence 77 | ] 78 | logger.info("Thinking sequence: %s", sequence) 79 | 80 | # Step 2: Execute processing based on complexity 81 | if routing_decision.strategy.complexity == ProcessingDepth.SINGLE: 82 | result = await self._process_single_direction( 83 | thought_data, context_prompt, routing_decision 84 | ) 85 | elif routing_decision.strategy.complexity == ProcessingDepth.DOUBLE: 86 | result = await self._process_double_direction_sequence( 87 | thought_data, context_prompt, routing_decision 88 | ) 89 | elif routing_decision.strategy.complexity == ProcessingDepth.TRIPLE: 90 | result = await self._process_triple_direction_sequence( 91 | thought_data, context_prompt, routing_decision 92 | ) 93 | else: # FULL 94 | result = await self._process_full_direction_sequence( 95 | thought_data, context_prompt, routing_decision 96 | ) 97 | 98 | processing_time = time.time() - start_time 99 | 100 | # Create final result 101 | final_result = MultiThinkingProcessingResult( 102 | content=result["final_content"], 103 | strategy_used=routing_decision.strategy.name, 104 | thinking_sequence=[ 105 | direction.value 106 | for direction in routing_decision.strategy.thinking_sequence 107 | ], 108 | processing_time=processing_time, 109 | complexity_score=routing_decision.complexity_metrics.complexity_score, 110 | cost_reduction=routing_decision.estimated_cost_reduction, 111 | individual_results=result.get("individual_results", {}), 112 | step_name="multi_thinking_processing", 113 | ) 114 | 115 | logger.info( 116 | "Multi-thinking processing completed - Time: %.3fs, Cost reduction: %.1f%%, Output: %d chars", 117 | processing_time, 118 | routing_decision.estimated_cost_reduction, 119 | len(final_result.content), 120 | ) 121 | 122 | return final_result 123 | 124 | except Exception as e: 125 | processing_time = time.time() - start_time 126 | logger.exception( 127 | f"Multi-thinking processing failed after {processing_time:.3f}s: {e}" 128 | ) 129 | 130 | return MultiThinkingProcessingResult( 131 | content=f"Multi-thinking processing failed: {e!s}", 132 | strategy_used="error_fallback", 133 | thinking_sequence=[], 134 | processing_time=processing_time, 135 | complexity_score=0.0, 136 | cost_reduction=0.0, 137 | individual_results={}, 138 | step_name="error_handling", 139 | ) 140 | 141 | async def _process_single_direction( 142 | self, thought_data: "ThoughtData", context: str, decision: RoutingDecision 143 | ) -> dict[str, Any]: 144 | """Process single thinking direction mode.""" 145 | thinking_direction = decision.strategy.thinking_sequence[0] 146 | logger.info(f" SINGLE THINKING MODE: {thinking_direction.value}") 147 | 148 | # Use enhanced model for synthesis thinking, standard model for other directions 149 | if thinking_direction == ThinkingDirection.SYNTHESIS: 150 | model = self.model_config.create_enhanced_model() 151 | logger.info(" Using enhanced model for synthesis thinking") 152 | else: 153 | model = self.model_config.create_standard_model() 154 | logger.info(" Using standard model for focused thinking") 155 | 156 | agent = self.thinking_factory.create_thinking_agent( 157 | thinking_direction, model, context, {} 158 | ) 159 | 160 | # Execute processing 161 | result = await agent.arun(input=thought_data.thought) 162 | 163 | # Extract content 164 | content = self._extract_content(result) 165 | 166 | return { 167 | "final_content": content, 168 | "individual_results": {thinking_direction.value: content}, 169 | } 170 | 171 | async def _process_double_direction_sequence( 172 | self, thought_data: "ThoughtData", context: str, decision: RoutingDecision 173 | ) -> dict[str, Any]: 174 | """Process dual thinking direction sequence with parallel execution.""" 175 | direction1, direction2 = decision.strategy.thinking_sequence 176 | logger.info( 177 | f" DUAL THINKING SEQUENCE: {direction1.value} + {direction2.value} (parallel)" 178 | ) 179 | 180 | individual_results = {} 181 | 182 | # Check if synthesis agent is involved 183 | has_synthesis = any(d == ThinkingDirection.SYNTHESIS for d in [direction1, direction2]) 184 | 185 | if has_synthesis: 186 | # If synthesis is involved, run non-synthesis agents in parallel, then synthesis 187 | non_synthesis_directions = [d for d in [direction1, direction2] if d != ThinkingDirection.SYNTHESIS] 188 | synthesis_direction = ThinkingDirection.SYNTHESIS 189 | 190 | # Run non-synthesis agents in parallel 191 | import asyncio 192 | tasks = [] 193 | for direction in non_synthesis_directions: 194 | model = self.model_config.create_standard_model() 195 | logger.info(f" Using standard model for {direction.value} thinking (parallel)") 196 | 197 | agent = self.thinking_factory.create_thinking_agent( 198 | direction, model, context, {} 199 | ) 200 | task = agent.arun(input=thought_data.thought) 201 | tasks.append((direction, task)) 202 | 203 | # Execute parallel tasks 204 | logger.info(f" Executing {len(tasks)} thinking agents in parallel") 205 | parallel_results = await asyncio.gather(*[task for _, task in tasks]) 206 | 207 | # Process parallel results 208 | for (direction, _), result in zip(tasks, parallel_results, strict=False): 209 | content = self._extract_content(result) 210 | individual_results[direction.value] = content 211 | logger.info(f" {direction.value} thinking completed (parallel)") 212 | 213 | # Run synthesis agent with all parallel results 214 | model = self.model_config.create_enhanced_model() 215 | logger.info(f" Using enhanced model for {synthesis_direction.value} synthesis") 216 | 217 | synthesis_agent = self.thinking_factory.create_thinking_agent( 218 | synthesis_direction, model, context, individual_results 219 | ) 220 | 221 | # Build synthesis input 222 | synthesis_input = self._build_synthesis_integration_input( 223 | thought_data.thought, individual_results 224 | ) 225 | 226 | synthesis_result = await synthesis_agent.arun(input=synthesis_input) 227 | synthesis_content = self._extract_content(synthesis_result) 228 | individual_results[synthesis_direction.value] = synthesis_content 229 | 230 | logger.info(f" {synthesis_direction.value} thinking completed") 231 | 232 | final_content = synthesis_content 233 | else: 234 | # No synthesis agent - run both agents in parallel 235 | import asyncio 236 | tasks = [] 237 | 238 | for direction in [direction1, direction2]: 239 | model = self.model_config.create_standard_model() 240 | logger.info(f" Using standard model for {direction.value} thinking (parallel)") 241 | 242 | agent = self.thinking_factory.create_thinking_agent( 243 | direction, model, context, {} 244 | ) 245 | task = agent.arun(input=thought_data.thought) 246 | tasks.append((direction, task)) 247 | 248 | # Execute parallel tasks 249 | logger.info(" Executing 2 thinking agents in parallel") 250 | parallel_results = await asyncio.gather(*[task for _, task in tasks]) 251 | 252 | # Process parallel results 253 | for (direction, _), result in zip(tasks, parallel_results, strict=False): 254 | content = self._extract_content(result) 255 | individual_results[direction.value] = content 256 | logger.info(f" {direction.value} thinking completed (parallel)") 257 | 258 | # Combine results programmatically 259 | final_content = self._combine_dual_thinking_results( 260 | direction1, individual_results[direction1.value], 261 | direction2, individual_results[direction2.value], 262 | thought_data.thought 263 | ) 264 | 265 | return { 266 | "final_content": final_content, 267 | "individual_results": individual_results, 268 | } 269 | 270 | async def _process_triple_direction_sequence( 271 | self, thought_data: "ThoughtData", context: str, decision: RoutingDecision 272 | ) -> dict[str, Any]: 273 | """Process triple thinking direction sequence with parallel execution.""" 274 | thinking_sequence = decision.strategy.thinking_sequence 275 | logger.info( 276 | f" TRIPLE THINKING SEQUENCE: {' + '.join(direction.value for direction in thinking_sequence)} (parallel)" 277 | ) 278 | 279 | individual_results = {} 280 | 281 | # Triple strategy currently uses FACTUAL + CREATIVE + CRITICAL - all run in parallel 282 | import asyncio 283 | tasks = [] 284 | 285 | for thinking_direction in thinking_sequence: 286 | logger.info(f" Preparing {thinking_direction.value} thinking for parallel execution") 287 | 288 | # All agents use standard model (no synthesis in triple strategy) 289 | model = self.model_config.create_standard_model() 290 | logger.info(f" Using standard model for {thinking_direction.value} thinking") 291 | 292 | agent = self.thinking_factory.create_thinking_agent( 293 | thinking_direction, model, context, {} 294 | ) 295 | 296 | # All agents receive original input directly (parallel processing) 297 | task = agent.arun(input=thought_data.thought) 298 | tasks.append((thinking_direction, task)) 299 | 300 | # Execute all thinking directions in parallel 301 | logger.info(f" Executing {len(tasks)} thinking agents in parallel") 302 | parallel_results = await asyncio.gather(*[task for _, task in tasks]) 303 | 304 | # Process parallel results 305 | for (thinking_direction, _), result in zip(tasks, parallel_results, strict=False): 306 | content = self._extract_content(result) 307 | individual_results[thinking_direction.value] = content 308 | logger.info(f" {thinking_direction.value} thinking completed (parallel)") 309 | 310 | # Create programmatic synthesis (no synthesis agent in triple strategy) 311 | final_content = self._synthesize_triple_thinking_results( 312 | individual_results, thinking_sequence, thought_data.thought 313 | ) 314 | 315 | return { 316 | "final_content": final_content, 317 | "individual_results": individual_results, 318 | } 319 | 320 | async def _process_full_direction_sequence( 321 | self, thought_data: "ThoughtData", context: str, decision: RoutingDecision 322 | ) -> dict[str, Any]: 323 | """Process full multi-thinking direction sequence with parallel execution.""" 324 | thinking_sequence = decision.strategy.thinking_sequence 325 | logger.info( 326 | " FULL THINKING SEQUENCE: Initial orchestration -> Parallel processing -> Final synthesis" 327 | ) 328 | 329 | individual_results = {} 330 | 331 | # Step 1: Initial SYNTHESIS for orchestration (if first agent is SYNTHESIS) 332 | if thinking_sequence[0] == ThinkingDirection.SYNTHESIS: 333 | logger.info(" Step 1: Initial synthesis orchestration") 334 | 335 | initial_synthesis_model = self.model_config.create_enhanced_model() 336 | logger.info(" Using enhanced model for initial orchestration") 337 | 338 | initial_synthesis_agent = self.thinking_factory.create_thinking_agent( 339 | ThinkingDirection.SYNTHESIS, initial_synthesis_model, context, {} 340 | ) 341 | 342 | initial_result = await initial_synthesis_agent.arun(input=thought_data.thought) 343 | initial_content = self._extract_content(initial_result) 344 | individual_results["synthesis_initial"] = initial_content 345 | 346 | logger.info(" Initial orchestration completed") 347 | 348 | # Step 2: Identify non-synthesis agents for parallel execution 349 | non_synthesis_agents = [ 350 | direction for direction in thinking_sequence 351 | if direction != ThinkingDirection.SYNTHESIS 352 | ] 353 | 354 | if non_synthesis_agents: 355 | logger.info(f" Step 2: Parallel execution of {len(non_synthesis_agents)} thinking agents") 356 | 357 | import asyncio 358 | tasks = [] 359 | 360 | for thinking_direction in non_synthesis_agents: 361 | logger.info(f" Preparing {thinking_direction.value} thinking for parallel execution") 362 | 363 | model = self.model_config.create_standard_model() 364 | logger.info(f" Using standard model for {thinking_direction.value} thinking") 365 | 366 | agent = self.thinking_factory.create_thinking_agent( 367 | thinking_direction, model, context, {} 368 | ) 369 | 370 | # All non-synthesis agents receive original input (parallel processing) 371 | task = agent.arun(input=thought_data.thought) 372 | tasks.append((thinking_direction, task)) 373 | 374 | # Execute all non-synthesis agents in parallel 375 | logger.info(f" Executing {len(tasks)} thinking agents in parallel") 376 | parallel_results = await asyncio.gather(*[task for _, task in tasks]) 377 | 378 | # Process parallel results 379 | for (thinking_direction, _), result in zip(tasks, parallel_results, strict=False): 380 | content = self._extract_content(result) 381 | individual_results[thinking_direction.value] = content 382 | logger.info(f" {thinking_direction.value} thinking completed (parallel)") 383 | 384 | # Step 3: Final SYNTHESIS for integration (if last agent is SYNTHESIS) 385 | final_synthesis_agents = [ 386 | i for i, direction in enumerate(thinking_sequence) 387 | if direction == ThinkingDirection.SYNTHESIS and i > 0 388 | ] 389 | 390 | if final_synthesis_agents: 391 | logger.info(" Step 3: Final synthesis integration") 392 | 393 | final_synthesis_model = self.model_config.create_enhanced_model() 394 | logger.info(" Using enhanced model for final synthesis") 395 | 396 | # Remove initial synthesis from results for final integration 397 | integration_results = { 398 | k: v for k, v in individual_results.items() 399 | if not k.startswith("synthesis_") 400 | } 401 | 402 | final_synthesis_agent = self.thinking_factory.create_thinking_agent( 403 | ThinkingDirection.SYNTHESIS, final_synthesis_model, context, integration_results 404 | ) 405 | 406 | # Build synthesis integration input 407 | synthesis_input = self._build_synthesis_integration_input( 408 | thought_data.thought, integration_results 409 | ) 410 | 411 | final_result = await final_synthesis_agent.arun(input=synthesis_input) 412 | final_content = self._extract_content(final_result) 413 | individual_results["synthesis_final"] = final_content 414 | 415 | logger.info(" Final synthesis integration completed") 416 | 417 | # Use final synthesis result as the answer 418 | final_answer = final_content 419 | else: 420 | # No final synthesis - create programmatic synthesis 421 | final_answer = self._synthesize_full_thinking_results( 422 | individual_results, thinking_sequence, thought_data.thought 423 | ) 424 | 425 | return { 426 | "final_content": final_answer, 427 | "individual_results": individual_results, 428 | } 429 | 430 | def _extract_content(self, result: Any) -> str: 431 | """Extract content from agent execution result.""" 432 | if hasattr(result, "content"): 433 | content = result.content 434 | if isinstance(content, str): 435 | return content.strip() 436 | return str(content).strip() 437 | if isinstance(result, str): 438 | return result.strip() 439 | return str(result).strip() 440 | 441 | def _build_sequential_input( 442 | self, 443 | original_thought: str, 444 | previous_results: dict[str, str], 445 | current_direction: ThinkingDirection, 446 | ) -> str: 447 | """Build input for sequential processing.""" 448 | input_parts = [f"Original thought: {original_thought}", ""] 449 | 450 | if previous_results: 451 | input_parts.append("Previous analysis perspectives:") 452 | for direction_name, content in previous_results.items(): 453 | # Use generic descriptions instead of specific direction names 454 | perspective_name = self._get_generic_perspective_name(direction_name) 455 | input_parts.append( 456 | f" {perspective_name}: {content[:200]}{'...' if len(content) > 200 else ''}" 457 | ) 458 | input_parts.append("") 459 | 460 | # Use generic instruction instead of direction-specific instruction 461 | thinking_style = self._get_thinking_style_instruction(current_direction) 462 | input_parts.append(f"Now analyze this from a {thinking_style} perspective.") 463 | 464 | return "\n".join(input_parts) 465 | 466 | def _build_synthesis_integration_input( 467 | self, original_thought: str, all_results: dict[str, str] 468 | ) -> str: 469 | """Build synthesis integration input.""" 470 | input_parts = [ 471 | f"Original question: {original_thought}", 472 | "", 473 | "Collected insights from comprehensive analysis:", 474 | ] 475 | 476 | for direction_name, content in all_results.items(): 477 | if ( 478 | direction_name != "synthesis" 479 | ): # Avoid including previous synthesis results 480 | # Completely hide direction concepts, use generic analysis types 481 | perspective_name = self._get_generic_perspective_name(direction_name) 482 | input_parts.append(f"• {perspective_name}: {content}") 483 | 484 | input_parts.extend( 485 | [ 486 | "", 487 | "TASK: Synthesize all analysis insights into ONE comprehensive, unified answer.", 488 | "REQUIREMENTS:", 489 | "1. Provide a single, coherent response directly addressing the original question", 490 | "2. Integrate all insights naturally without mentioning different analysis types", 491 | "3. Do NOT list or separate different analysis perspectives in your response", 492 | "4. Do NOT use section headers or reference any specific analysis methods", 493 | "5. Do NOT mention 'direction', 'perspective', 'analysis type', or similar terms", 494 | "6. Write as a unified voice providing the final answer", 495 | "7. This will be the ONLY response the user sees - make it complete and standalone", 496 | "8. Your response should read as if it came from a single, integrated thought process", 497 | ] 498 | ) 499 | 500 | return "\n".join(input_parts) 501 | 502 | def _combine_dual_thinking_results( 503 | self, 504 | direction1: ThinkingDirection, 505 | content1: str, 506 | direction2: ThinkingDirection, 507 | content2: str, 508 | original_thought: str, 509 | ) -> str: 510 | """Combine dual thinking direction results.""" 511 | # If the second is synthesis thinking, return its result directly (should already be synthesized) 512 | if direction2 == ThinkingDirection.SYNTHESIS: 513 | return content2 514 | 515 | # Otherwise create synthesized answer without mentioning analysis methods 516 | if ( 517 | direction1 == ThinkingDirection.FACTUAL 518 | and direction2 == ThinkingDirection.EMOTIONAL 519 | ): 520 | return f"Regarding '{original_thought}': A comprehensive analysis reveals both objective realities and human emotional responses. {content1.lower()} while also recognizing that {content2.lower()} These complementary insights suggest a balanced approach that considers both factual evidence and human experience." 521 | if ( 522 | direction1 == ThinkingDirection.CRITICAL 523 | and direction2 == ThinkingDirection.OPTIMISTIC 524 | ): 525 | return f"Considering '{original_thought}': A thorough evaluation identifies both important concerns and significant opportunities. {content1.lower().strip('.')} while also recognizing promising aspects: {content2.lower()} A measured approach would address the concerns while pursuing the benefits." 526 | # Generic synthesis, completely hiding analysis structure 527 | return f"Analyzing '{original_thought}': A comprehensive evaluation reveals multiple important insights. {content1.lower().strip('.')} Additionally, {content2.lower()} Integrating these findings provides a well-rounded understanding that addresses the question from multiple angles." 528 | 529 | def _synthesize_triple_thinking_results( 530 | self, 531 | results: dict[str, str], 532 | thinking_sequence: list[ThinkingDirection], 533 | original_thought: str, 534 | ) -> str: 535 | """Synthesize triple thinking direction results.""" 536 | # Create truly synthesized answer, hiding all analysis structure 537 | content_pieces = [] 538 | for thinking_direction in thinking_sequence: 539 | direction_name = thinking_direction.value 540 | content = results.get(direction_name, "") 541 | if content: 542 | # Extract core insights, completely hiding sources 543 | clean_content = content.strip().rstrip(".!") 544 | content_pieces.append(clean_content) 545 | 546 | if len(content_pieces) >= 3: 547 | # Synthesis of three or more perspectives, completely unified 548 | return f"""Considering the question '{original_thought}', a comprehensive analysis reveals several crucial insights. 549 | 550 | {content_pieces[0].lower()}, which establishes the foundation for understanding. This leads to recognizing that {content_pieces[1].lower()}, adding essential depth to our comprehension. Furthermore, {content_pieces[2].lower() if len(content_pieces) > 2 else ""} 551 | 552 | Drawing these insights together, the answer emerges as a unified understanding that acknowledges the full complexity while providing clear guidance.""" 553 | if len(content_pieces) == 2: 554 | return f"Addressing '{original_thought}': A thorough evaluation shows that {content_pieces[0].lower()}, and importantly, {content_pieces[1].lower()} Together, these insights form a comprehensive understanding." 555 | if len(content_pieces) == 1: 556 | return f"Regarding '{original_thought}': {content_pieces[0]}" 557 | return f"After comprehensive consideration of '{original_thought}', the analysis suggests this question merits deeper exploration to provide a complete answer." 558 | 559 | def _synthesize_full_thinking_results( 560 | self, 561 | results: dict[str, str], 562 | thinking_sequence: list[ThinkingDirection], 563 | original_thought: str, 564 | ) -> str: 565 | """Synthesize full multi-thinking results.""" 566 | # If there's a synthesis result, use it preferentially 567 | synthesis_result = results.get("synthesis") 568 | if synthesis_result: 569 | return synthesis_result 570 | 571 | # Otherwise create synthesis 572 | return self._synthesize_triple_thinking_results( 573 | results, thinking_sequence, original_thought 574 | ) 575 | 576 | def _get_thinking_contribution(self, thinking_direction: ThinkingDirection) -> str: 577 | """Get thinking direction contribution description.""" 578 | contributions = { 579 | ThinkingDirection.FACTUAL: "factual information and objective data", 580 | ThinkingDirection.EMOTIONAL: "emotional insights and intuitive responses", 581 | ThinkingDirection.CRITICAL: "critical analysis and risk assessment", 582 | ThinkingDirection.OPTIMISTIC: "positive possibilities and value identification", 583 | ThinkingDirection.CREATIVE: "creative alternatives and innovative solutions", 584 | ThinkingDirection.SYNTHESIS: "process management and integrated thinking", 585 | } 586 | return contributions.get(thinking_direction, "specialized thinking") 587 | 588 | def _get_generic_perspective_name(self, direction_name: str) -> str: 589 | """Get generic analysis type name for thinking direction, hiding direction concepts.""" 590 | name_mapping = { 591 | "factual": "Factual analysis", 592 | "emotional": "Emotional considerations", 593 | "critical": "Risk assessment", 594 | "optimistic": "Opportunity analysis", 595 | "creative": "Creative exploration", 596 | "synthesis": "Strategic synthesis", 597 | } 598 | return name_mapping.get(direction_name.lower(), "Analysis") 599 | 600 | def _get_thinking_style_instruction( 601 | self, thinking_direction: ThinkingDirection 602 | ) -> str: 603 | """Get thinking style instruction, avoiding mention of direction concepts.""" 604 | style_mapping = { 605 | ThinkingDirection.FACTUAL: "factual and objective", 606 | ThinkingDirection.EMOTIONAL: "emotional and intuitive", 607 | ThinkingDirection.CRITICAL: "critical and cautious", 608 | ThinkingDirection.OPTIMISTIC: "positive and optimistic", 609 | ThinkingDirection.CREATIVE: "creative and innovative", 610 | ThinkingDirection.SYNTHESIS: "strategic and integrative", 611 | } 612 | return style_mapping.get(thinking_direction, "analytical") 613 | 614 | 615 | # Create global processor instance 616 | _multi_thinking_processor = MultiThinkingSequentialProcessor() 617 | 618 | 619 | # Convenience function 620 | async def process_with_multi_thinking( 621 | thought_data: "ThoughtData", context: str = "" 622 | ) -> MultiThinkingProcessingResult: 623 | """Convenience function for processing thoughts using multi-thinking directions.""" 624 | return await _multi_thinking_processor.process_with_multi_thinking( 625 | thought_data, context 626 | ) 627 | 628 | 629 | def create_multi_thinking_step_output( 630 | result: MultiThinkingProcessingResult, 631 | ) -> StepOutput: 632 | """Convert multi-thinking processing result to Agno StepOutput.""" 633 | return StepOutput( 634 | content=result.content, 635 | success=True, 636 | step_name=result.step_name, 637 | ) 638 | ```