This is page 2 of 2. Use http://codebase.md/fradser/mcp-server-mas-sequential-thinking?page={x} to view the full context. # Directory Structure ``` ├── .env.example ├── .gitignore ├── .python-version ├── CHANGELOG.md ├── CLAUDE.md ├── Dockerfile ├── pyproject.toml ├── README.md ├── README.zh-CN.md ├── smithery.yaml ├── src │ └── mcp_server_mas_sequential_thinking │ ├── __init__.py │ ├── config │ │ ├── __init__.py │ │ ├── constants.py │ │ └── modernized_config.py │ ├── core │ │ ├── __init__.py │ │ ├── models.py │ │ ├── session.py │ │ └── types.py │ ├── infrastructure │ │ ├── __init__.py │ │ ├── logging_config.py │ │ └── persistent_memory.py │ ├── main.py │ ├── processors │ │ ├── __init__.py │ │ ├── multi_thinking_core.py │ │ └── multi_thinking_processor.py │ ├── routing │ │ ├── __init__.py │ │ ├── agno_workflow_router.py │ │ ├── ai_complexity_analyzer.py │ │ ├── complexity_types.py │ │ └── multi_thinking_router.py │ ├── services │ │ ├── __init__.py │ │ ├── context_builder.py │ │ ├── processing_orchestrator.py │ │ ├── response_formatter.py │ │ ├── response_processor.py │ │ ├── retry_handler.py │ │ ├── server_core.py │ │ ├── thought_processor_refactored.py │ │ └── workflow_executor.py │ └── utils │ ├── __init__.py │ └── utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/processors/multi_thinking_core.py: -------------------------------------------------------------------------------- ```python """Multi-Thinking Core Agent Architecture. Multi-dimensional thinking direction Agent core implementation. Strictly follows "focused thinking direction" principles, supporting intelligent sequences from single to multiple thinking modes. """ # Lazy import to break circular dependency import logging import os from abc import abstractmethod from dataclasses import dataclass from enum import Enum from typing import Any from agno.agent import Agent from agno.models.base import Model from agno.tools.reasoning import ReasoningTools # Try to import ExaTools, gracefully handle if not available try: from agno.tools.exa import ExaTools EXA_AVAILABLE = bool(os.environ.get("EXA_API_KEY")) except ImportError: ExaTools = None EXA_AVAILABLE = False logger = logging.getLogger(__name__) class ThinkingDirection(Enum): """Multi-thinking direction enumeration.""" FACTUAL = "factual" # Facts and data EMOTIONAL = "emotional" # Emotions and intuition CRITICAL = "critical" # Criticism and risk OPTIMISTIC = "optimistic" # Optimism and value CREATIVE = "creative" # Creativity and innovation SYNTHESIS = "synthesis" # Metacognition and integration class ProcessingDepth(Enum): """Processing complexity levels.""" SINGLE = "single" # Single thinking mode DOUBLE = "double" # Double thinking sequence TRIPLE = "triple" # Triple thinking sequence FULL = "full" # Complete multi-thinking @dataclass(frozen=True) class ThinkingTimingConfig: """Thinking direction timing configuration.""" direction: ThinkingDirection default_time_seconds: int min_time_seconds: int max_time_seconds: int is_quick_reaction: bool = ( False # Whether it's a quick reaction mode (like emotional thinking) ) # Timing configuration constants THINKING_TIMING_CONFIGS = { ThinkingDirection.FACTUAL: ThinkingTimingConfig( ThinkingDirection.FACTUAL, 120, 60, 300, False ), ThinkingDirection.EMOTIONAL: ThinkingTimingConfig( ThinkingDirection.EMOTIONAL, 30, 15, 60, True ), # Quick intuition ThinkingDirection.CRITICAL: ThinkingTimingConfig( ThinkingDirection.CRITICAL, 120, 60, 240, False ), ThinkingDirection.OPTIMISTIC: ThinkingTimingConfig( ThinkingDirection.OPTIMISTIC, 120, 60, 240, False ), ThinkingDirection.CREATIVE: ThinkingTimingConfig( ThinkingDirection.CREATIVE, 240, 120, 360, False ), # Creativity requires more time ThinkingDirection.SYNTHESIS: ThinkingTimingConfig( ThinkingDirection.SYNTHESIS, 60, 30, 120, False ), } @dataclass(frozen=True) class ThinkingCapability: """Multi-thinking capability definition.""" thinking_direction: ThinkingDirection role: str description: str role_description: str # Cognitive characteristics thinking_mode: str cognitive_focus: str output_style: str # Time management timing_config: ThinkingTimingConfig # Enhanced features tools: list[Any] | None = None reasoning_level: int = 1 memory_enabled: bool = False def __post_init__(self): if self.tools is None: # Default tools for all thinking directions tools = [ReasoningTools] # Add ExaTools for all thinking directions except SYNTHESIS if ( EXA_AVAILABLE and ExaTools is not None and self.thinking_direction != ThinkingDirection.SYNTHESIS ): tools.append(ExaTools) object.__setattr__(self, "tools", tools) def get_instructions( self, context: str = "", previous_results: dict | None = None ) -> list[str]: """Generate specific analysis mode instructions.""" base_instructions = [ f"You are operating in {self.thinking_mode} analysis mode.", f"Role: {self.role}", f"Cognitive Focus: {self.cognitive_focus}", "", "CORE PRINCIPLES:", f"1. Apply ONLY {self.thinking_mode} approach - maintain strict focus", f"2. Time allocation: {self.timing_config.default_time_seconds} seconds for thorough analysis", f"3. Output approach: {self.output_style}", "", f"Your specific responsibility: {self.role_description}", ] # Add specific analysis mode detailed instructions specific_instructions = self._get_specific_instructions() base_instructions.extend(specific_instructions) # Add context and previous results if context: base_instructions.extend( [ "", f"Context: {context}", ] ) if previous_results: base_instructions.extend( [ "", "Previous analysis insights from other perspectives:", *[ f" {self._format_previous_result_label(direction_name)}: {result[:100]}..." for direction_name, result in previous_results.items() ], ] ) return base_instructions def _format_previous_result_label(self, direction_name: str) -> str: """Format previous result labels, using thinking direction concepts.""" label_mapping = { "factual": "Factual analysis", "emotional": "Emotional perspective", "critical": "Critical evaluation", "optimistic": "Optimistic view", "creative": "Creative thinking", "synthesis": "Strategic synthesis", } return label_mapping.get(direction_name.lower(), "Analysis") @abstractmethod def _get_specific_instructions(self) -> list[str]: """Get specific thinking direction detailed instructions.""" class FactualThinkingCapability(ThinkingCapability): """Factual thinking capability: facts and data.""" def __init__(self) -> None: super().__init__( thinking_direction=ThinkingDirection.FACTUAL, role="Factual Information Processor", description="Collect and process objective facts and data", role_description="I focus only on objective facts and data. I provide neutral information without personal interpretation.", thinking_mode="analytical_factual", cognitive_focus="Pure information processing, zero emotion or judgment", output_style="Objective fact lists, data-driven information", timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.FACTUAL], reasoning_level=2, memory_enabled=False, ) def _get_specific_instructions(self) -> list[str]: instructions = [ "", "FACTUAL ANALYSIS GUIDELINES:", "• Use simple statements to present facts: 'The data shows...', 'Known information is...'", "• Avoid technical jargon, explain data in everyday language", "• Present only verified facts and objective data", "• Avoid opinions, interpretations, or emotional reactions", "• Identify what information is missing and needed", "• Separate facts from assumptions clearly", ] # Add research capabilities if ExaTools is available if EXA_AVAILABLE and ExaTools is not None: instructions.extend( [ "", "RESEARCH CAPABILITIES:", "• Use search_exa() to find current facts and data when needed", "• Search for recent information, statistics, or verified data", "• Cite sources when presenting factual information", "• Prioritize authoritative sources and recent data", ] ) instructions.extend( [ "", "FORBIDDEN in factual analysis mode:", "- Personal opinions or judgments", "- Emotional responses or gut feelings", "- Speculation or 'what if' scenarios", "- Value judgments (good/bad, right/wrong)", ] ) return instructions class EmotionalThinkingCapability(ThinkingCapability): """Emotional thinking capability: emotions and intuition.""" def __init__(self) -> None: super().__init__( thinking_direction=ThinkingDirection.EMOTIONAL, role="Intuitive Emotional Processor", description="Provide emotional responses and intuitive insights", role_description="I express intuition and emotional reactions. No reasoning needed, just share feelings.", thinking_mode="intuitive_emotional", cognitive_focus="Emotional intelligence and intuitive processing", output_style="Intuitive reactions, emotional expression, humanized perspective", timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.EMOTIONAL], reasoning_level=1, # Lowest rationality, highest intuition memory_enabled=False, ) def _get_specific_instructions(self) -> list[str]: return [ "", "EMOTIONAL INTUITIVE GUIDELINES:", "• Start responses with 'I feel...', 'My intuition tells me...', 'My gut reaction is...'", "• Keep expressions brief and powerful - 30-second emotional snapshots", "• Express immediate gut reactions and feelings", "• Share intuitive hunches without justification", "• Include visceral, immediate responses", "• NO need to explain or justify feelings", "", "ENCOURAGED in emotional intuitive mode:", "- First impressions and gut reactions", "- Emotional responses to ideas or situations", "- Intuitive concerns or excitement", "- 'Sixth sense' about what might work", "", "Remember: This is a 30-second emotional snapshot, not analysis!", ] class CriticalThinkingCapability(ThinkingCapability): """Critical thinking capability: criticism and risk.""" def __init__(self) -> None: super().__init__( thinking_direction=ThinkingDirection.CRITICAL, role="Critical Risk Assessor", description="Critical analysis and risk identification", role_description="I identify risks and problems. Critical but not pessimistic, I point out real difficulties.", thinking_mode="critical_analytical", cognitive_focus="Critical thinking and risk assessment", output_style="Sharp questioning, risk warnings, logical verification", timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.CRITICAL], reasoning_level=3, # High logical reasoning memory_enabled=False, ) def _get_specific_instructions(self) -> list[str]: instructions = [ "", "CRITICAL ASSESSMENT GUIDELINES:", "• Point out specific possible problems, not general pessimism", "• Use phrases like 'The risk is...', 'This could fail because...', 'A problem might be...'", "• Identify potential problems, risks, and weaknesses", "• Challenge assumptions and look for logical flaws", "• Consider worst-case scenarios and failure modes", "• Provide logical reasons for all concerns raised", ] # Add research capabilities if ExaTools is available if EXA_AVAILABLE and ExaTools is not None: instructions.extend( [ "", "RESEARCH FOR CRITICAL ANALYSIS:", "• Search for counterexamples, failed cases, or criticism of similar ideas", "• Look for expert opinions that identify risks or problems", "• Find case studies of failures in similar contexts", "• Research regulatory or compliance issues that might apply", ] ) instructions.extend( [ "", "KEY AREAS TO EXAMINE:", "- Logical inconsistencies in arguments", "- Practical obstacles and implementation challenges", "- Resource constraints and limitations", "- Potential negative consequences", "- Missing information or unproven assumptions", "", "Note: Be critical but constructive - identify real problems, not just pessimism.", ] ) return instructions class OptimisticThinkingCapability(ThinkingCapability): """Optimistic thinking capability: optimism and value.""" def __init__(self) -> None: super().__init__( thinking_direction=ThinkingDirection.OPTIMISTIC, role="Optimistic Value Explorer", description="Positive thinking and value discovery", role_description="I find value and opportunities. Realistic optimism, I discover genuine benefits.", thinking_mode="optimistic_constructive", cognitive_focus="Positive psychology and opportunity identification", output_style="Positive exploration, value discovery, opportunity identification", timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.OPTIMISTIC], reasoning_level=2, memory_enabled=False, ) def _get_specific_instructions(self) -> list[str]: instructions = [ "", "OPTIMISTIC VALUE EXPLORATION GUIDELINES:", "• Point out specific feasible benefits, not empty praise", "• Use phrases like 'The benefit is...', 'This creates... value', 'An opportunity here is...'", "• Focus on benefits, values, and positive outcomes", "• Explore best-case scenarios and opportunities", "• Identify feasible positive possibilities", "• Provide logical reasons for optimism", ] # Add research capabilities if ExaTools is available if EXA_AVAILABLE and ExaTools is not None: instructions.extend( [ "", "RESEARCH FOR OPTIMISTIC ANALYSIS:", "• Search for success stories and positive case studies", "• Look for evidence of benefits in similar situations", "• Find research supporting potential positive outcomes", "• Research market opportunities and growth potential", ] ) instructions.extend( [ "", "KEY AREAS TO EXPLORE:", "- Benefits and positive outcomes", "- Opportunities for growth or improvement", "- Feasible best-case scenarios", "- Value creation possibilities", "- Strengths and positive aspects", "- Why this could work well", "", "Note: Be realistically optimistic - find genuine value, not false hope.", ] ) return instructions class CreativeThinkingCapability(ThinkingCapability): """Creative thinking capability: creativity and innovation.""" def __init__(self) -> None: super().__init__( thinking_direction=ThinkingDirection.CREATIVE, role="Creative Innovation Generator", description="Creative thinking and innovative solutions", role_description="I generate new ideas and alternative approaches. I break conventional limits and explore possibilities.", thinking_mode="creative_generative", cognitive_focus="Divergent thinking and creativity", output_style="Novel ideas, innovative solutions, alternative thinking", timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.CREATIVE], reasoning_level=2, memory_enabled=True, # Creativity may require memory combination ) def _get_specific_instructions(self) -> list[str]: instructions = [ "", "CREATIVE INNOVATION GUIDELINES:", "• Provide 3-5 specific creative ideas that could work", "• Use phrases like 'What if...', 'Another approach could be...', 'An alternative is...'", "• Generate new ideas, alternatives, and creative solutions", "• Think laterally - explore unconventional approaches", "• Break normal thinking patterns and assumptions", "• Suggest modifications, improvements, or entirely new approaches", ] # Add research capabilities if ExaTools is available if EXA_AVAILABLE and ExaTools is not None: instructions.extend( [ "", "RESEARCH FOR CREATIVE INSPIRATION:", "• Search for innovative solutions in different industries", "• Look for creative approaches used in unrelated fields", "• Find emerging trends and new methodologies", "• Research breakthrough innovations and creative disruptions", ] ) instructions.extend( [ "", "CREATIVE TECHNIQUES TO USE:", "- Lateral thinking and analogies", "- Random word associations", "- 'What if' scenarios and thought experiments", "- Reversal thinking (what's the opposite?)", "- Combination of unrelated elements", "- Alternative perspectives and viewpoints", "", "Note: Quantity over quality - generate many ideas without judgment.", ] ) return instructions class SynthesisThinkingCapability(ThinkingCapability): """Synthesis thinking capability: metacognition and integration.""" def __init__(self) -> None: super().__init__( thinking_direction=ThinkingDirection.SYNTHESIS, role="Metacognitive Orchestrator", description="Thinking process management and comprehensive coordination", role_description="I integrate all perspectives and provide the final balanced answer. My output is what users see - it must be practical and human-friendly.", thinking_mode="metacognitive_synthetic", cognitive_focus="Metacognition and executive control", output_style="Comprehensive integration, process management, unified conclusions", timing_config=THINKING_TIMING_CONFIGS[ThinkingDirection.SYNTHESIS], reasoning_level=3, # Highest level of metacognition memory_enabled=True, # Need to remember all other thinking direction results ) def _get_specific_instructions(self) -> list[str]: return [ "", "STRATEGIC SYNTHESIS GUIDELINES:", "• Your primary goal: Answer the original question using insights from other analyses", "• Avoid generic rehashing - focus specifically on the question asked", "• Use other analyses' contributions as evidence/perspectives to build your answer", "• Provide practical, actionable insights users can understand", "", "CRITICAL QUESTION-FOCUSED APPROACH:", "1. Extract insights from other analyses that directly address the question", "2. Ignore generic statements - focus on question-relevant content", "3. Build a coherent answer that uses multiple perspectives as support", "4. End with a clear, direct response to what was originally asked", "", "KEY RESPONSIBILITIES:", "- Return to the original question and answer it directly", "- Use other analyses' insights as building blocks for your answer", "- Synthesize perspectives into a cohesive response to the specific question", "- Avoid academic summarization - focus on practical question-answering", "- Ensure your entire response serves the original question", "", "FINAL OUTPUT REQUIREMENTS:", "• This is the user's ONLY answer - it must directly address their question", "• Don't just summarize - synthesize into a clear answer", "• Remove content that doesn't directly relate to the original question", "• For philosophical questions: provide thoughtful answers, not just analysis", "• You may mention different analysis types, perspectives, or methodologies if relevant", "• Present your synthesis process transparently, showing how different viewpoints contribute", ] class MultiThinkingAgentFactory: """Multi-thinking Agent factory.""" # Thinking direction capability mapping THINKING_CAPABILITIES = { ThinkingDirection.FACTUAL: FactualThinkingCapability(), ThinkingDirection.EMOTIONAL: EmotionalThinkingCapability(), ThinkingDirection.CRITICAL: CriticalThinkingCapability(), ThinkingDirection.OPTIMISTIC: OptimisticThinkingCapability(), ThinkingDirection.CREATIVE: CreativeThinkingCapability(), ThinkingDirection.SYNTHESIS: SynthesisThinkingCapability(), } def __init__(self) -> None: self._agent_cache: dict[str, Agent] = {} # Cache for created agents def create_thinking_agent( self, thinking_direction: ThinkingDirection, model: Model, context: str = "", previous_results: dict | None = None, **kwargs, ) -> Agent: """Create a specific thinking direction Agent.""" capability = self.THINKING_CAPABILITIES[thinking_direction] # Generate cache key cache_key = ( f"{thinking_direction.value}_{model.__class__.__name__}_{hash(context)}" ) if cache_key in self._agent_cache: # Return cached agent but update instructions agent = self._agent_cache[cache_key] agent.instructions = capability.get_instructions(context, previous_results) return agent # Create new agent agent = Agent( name=f"{thinking_direction.value.title()}AnalysisAgent", role=capability.role, description=capability.description, model=model, tools=capability.tools if capability.tools else None, instructions=capability.get_instructions(context, previous_results), markdown=True, **kwargs, ) # Add special configuration if capability.memory_enabled: agent.enable_user_memories = True # Cache agent self._agent_cache[cache_key] = agent logger.info( f"Created {thinking_direction.value} thinking agent with {capability.timing_config.default_time_seconds}s time limit" ) return agent def get_thinking_timing( self, thinking_direction: ThinkingDirection ) -> ThinkingTimingConfig: """Get specific thinking direction timing configuration.""" return THINKING_TIMING_CONFIGS[thinking_direction] def get_all_thinking_directions(self) -> list[ThinkingDirection]: """Get all available thinking directions.""" return list(self.THINKING_CAPABILITIES.keys()) def clear_cache(self) -> None: """Clear agent cache.""" self._agent_cache.clear() logger.info("Thinking agent cache cleared") # Global factory instance _thinking_factory = MultiThinkingAgentFactory() # Convenience functions def create_thinking_agent( thinking_direction: ThinkingDirection, model: Model, **kwargs ) -> Agent: """Convenience function to create thinking Agent.""" return _thinking_factory.create_thinking_agent(thinking_direction, model, **kwargs) def get_thinking_timing(thinking_direction: ThinkingDirection) -> ThinkingTimingConfig: """Convenience function to get thinking timing configuration.""" return _thinking_factory.get_thinking_timing(thinking_direction) def get_all_thinking_directions() -> list[ThinkingDirection]: """Convenience function to get all thinking directions.""" return _thinking_factory.get_all_thinking_directions() ``` -------------------------------------------------------------------------------- /src/mcp_server_mas_sequential_thinking/processors/multi_thinking_processor.py: -------------------------------------------------------------------------------- ```python """Multi-Thinking Sequential Processor. Implements a parallel processor based on multi-directional thinking methodology, integrated with the Agno Workflow system. Supports intelligent parallel processing from single direction to full multi-direction analysis. """ # Lazy import to break circular dependency import logging import time from dataclasses import dataclass from typing import TYPE_CHECKING, Any from agno.workflow.types import StepOutput if TYPE_CHECKING: from mcp_server_mas_sequential_thinking.core.models import ThoughtData logger = logging.getLogger(__name__) from mcp_server_mas_sequential_thinking.config.modernized_config import get_model_config from mcp_server_mas_sequential_thinking.routing.multi_thinking_router import ( MultiThinkingIntelligentRouter, RoutingDecision, ) from .multi_thinking_core import ( MultiThinkingAgentFactory, ProcessingDepth, ThinkingDirection, ) # logger already defined above @dataclass class MultiThinkingProcessingResult: """Multi-thinking processing result.""" content: str strategy_used: str thinking_sequence: list[str] processing_time: float complexity_score: float cost_reduction: float individual_results: dict[str, str] # Results from each thinking direction step_name: str class MultiThinkingSequentialProcessor: """Multi-thinking parallel processor.""" def __init__(self) -> None: self.model_config = get_model_config() self.thinking_factory = MultiThinkingAgentFactory() self.router = MultiThinkingIntelligentRouter() async def process_with_multi_thinking( self, thought_data: "ThoughtData", context_prompt: str = "" ) -> MultiThinkingProcessingResult: """Process thoughts using multi-thinking methodology with parallel execution.""" start_time = time.time() logger.info("Multi-thinking processing started") if logger.isEnabledFor(logging.INFO): logger.info("Input preview: %s", thought_data.thought[:100]) logger.info("Context length: %d chars", len(context_prompt)) try: # Step 1: Intelligent routing decision routing_decision = await self.router.route_thought(thought_data) logger.info("Selected strategy: %s", routing_decision.strategy.name) if logger.isEnabledFor(logging.INFO): sequence = [ direction.value for direction in routing_decision.strategy.thinking_sequence ] logger.info("Thinking sequence: %s", sequence) # Step 2: Execute processing based on complexity if routing_decision.strategy.complexity == ProcessingDepth.SINGLE: result = await self._process_single_direction( thought_data, context_prompt, routing_decision ) elif routing_decision.strategy.complexity == ProcessingDepth.DOUBLE: result = await self._process_double_direction_sequence( thought_data, context_prompt, routing_decision ) elif routing_decision.strategy.complexity == ProcessingDepth.TRIPLE: result = await self._process_triple_direction_sequence( thought_data, context_prompt, routing_decision ) else: # FULL result = await self._process_full_direction_sequence( thought_data, context_prompt, routing_decision ) processing_time = time.time() - start_time # Create final result final_result = MultiThinkingProcessingResult( content=result["final_content"], strategy_used=routing_decision.strategy.name, thinking_sequence=[ direction.value for direction in routing_decision.strategy.thinking_sequence ], processing_time=processing_time, complexity_score=routing_decision.complexity_metrics.complexity_score, cost_reduction=routing_decision.estimated_cost_reduction, individual_results=result.get("individual_results", {}), step_name="multi_thinking_processing", ) logger.info( "Multi-thinking processing completed - Time: %.3fs, Cost reduction: %.1f%%, Output: %d chars", processing_time, routing_decision.estimated_cost_reduction, len(final_result.content), ) return final_result except Exception as e: processing_time = time.time() - start_time logger.exception( f"Multi-thinking processing failed after {processing_time:.3f}s: {e}" ) return MultiThinkingProcessingResult( content=f"Multi-thinking processing failed: {e!s}", strategy_used="error_fallback", thinking_sequence=[], processing_time=processing_time, complexity_score=0.0, cost_reduction=0.0, individual_results={}, step_name="error_handling", ) async def _process_single_direction( self, thought_data: "ThoughtData", context: str, decision: RoutingDecision ) -> dict[str, Any]: """Process single thinking direction mode.""" thinking_direction = decision.strategy.thinking_sequence[0] logger.info(f" SINGLE THINKING MODE: {thinking_direction.value}") # Use enhanced model for synthesis thinking, standard model for other directions if thinking_direction == ThinkingDirection.SYNTHESIS: model = self.model_config.create_enhanced_model() logger.info(" Using enhanced model for synthesis thinking") else: model = self.model_config.create_standard_model() logger.info(" Using standard model for focused thinking") agent = self.thinking_factory.create_thinking_agent( thinking_direction, model, context, {} ) # Execute processing result = await agent.arun(input=thought_data.thought) # Extract content content = self._extract_content(result) return { "final_content": content, "individual_results": {thinking_direction.value: content}, } async def _process_double_direction_sequence( self, thought_data: "ThoughtData", context: str, decision: RoutingDecision ) -> dict[str, Any]: """Process dual thinking direction sequence with parallel execution.""" direction1, direction2 = decision.strategy.thinking_sequence logger.info( f" DUAL THINKING SEQUENCE: {direction1.value} + {direction2.value} (parallel)" ) individual_results = {} # Check if synthesis agent is involved has_synthesis = any(d == ThinkingDirection.SYNTHESIS for d in [direction1, direction2]) if has_synthesis: # If synthesis is involved, run non-synthesis agents in parallel, then synthesis non_synthesis_directions = [d for d in [direction1, direction2] if d != ThinkingDirection.SYNTHESIS] synthesis_direction = ThinkingDirection.SYNTHESIS # Run non-synthesis agents in parallel import asyncio tasks = [] for direction in non_synthesis_directions: model = self.model_config.create_standard_model() logger.info(f" Using standard model for {direction.value} thinking (parallel)") agent = self.thinking_factory.create_thinking_agent( direction, model, context, {} ) task = agent.arun(input=thought_data.thought) tasks.append((direction, task)) # Execute parallel tasks logger.info(f" Executing {len(tasks)} thinking agents in parallel") parallel_results = await asyncio.gather(*[task for _, task in tasks]) # Process parallel results for (direction, _), result in zip(tasks, parallel_results, strict=False): content = self._extract_content(result) individual_results[direction.value] = content logger.info(f" {direction.value} thinking completed (parallel)") # Run synthesis agent with all parallel results model = self.model_config.create_enhanced_model() logger.info(f" Using enhanced model for {synthesis_direction.value} synthesis") synthesis_agent = self.thinking_factory.create_thinking_agent( synthesis_direction, model, context, individual_results ) # Build synthesis input synthesis_input = self._build_synthesis_integration_input( thought_data.thought, individual_results ) synthesis_result = await synthesis_agent.arun(input=synthesis_input) synthesis_content = self._extract_content(synthesis_result) individual_results[synthesis_direction.value] = synthesis_content logger.info(f" {synthesis_direction.value} thinking completed") final_content = synthesis_content else: # No synthesis agent - run both agents in parallel import asyncio tasks = [] for direction in [direction1, direction2]: model = self.model_config.create_standard_model() logger.info(f" Using standard model for {direction.value} thinking (parallel)") agent = self.thinking_factory.create_thinking_agent( direction, model, context, {} ) task = agent.arun(input=thought_data.thought) tasks.append((direction, task)) # Execute parallel tasks logger.info(" Executing 2 thinking agents in parallel") parallel_results = await asyncio.gather(*[task for _, task in tasks]) # Process parallel results for (direction, _), result in zip(tasks, parallel_results, strict=False): content = self._extract_content(result) individual_results[direction.value] = content logger.info(f" {direction.value} thinking completed (parallel)") # Combine results programmatically final_content = self._combine_dual_thinking_results( direction1, individual_results[direction1.value], direction2, individual_results[direction2.value], thought_data.thought ) return { "final_content": final_content, "individual_results": individual_results, } async def _process_triple_direction_sequence( self, thought_data: "ThoughtData", context: str, decision: RoutingDecision ) -> dict[str, Any]: """Process triple thinking direction sequence with parallel execution.""" thinking_sequence = decision.strategy.thinking_sequence logger.info( f" TRIPLE THINKING SEQUENCE: {' + '.join(direction.value for direction in thinking_sequence)} (parallel)" ) individual_results = {} # Triple strategy currently uses FACTUAL + CREATIVE + CRITICAL - all run in parallel import asyncio tasks = [] for thinking_direction in thinking_sequence: logger.info(f" Preparing {thinking_direction.value} thinking for parallel execution") # All agents use standard model (no synthesis in triple strategy) model = self.model_config.create_standard_model() logger.info(f" Using standard model for {thinking_direction.value} thinking") agent = self.thinking_factory.create_thinking_agent( thinking_direction, model, context, {} ) # All agents receive original input directly (parallel processing) task = agent.arun(input=thought_data.thought) tasks.append((thinking_direction, task)) # Execute all thinking directions in parallel logger.info(f" Executing {len(tasks)} thinking agents in parallel") parallel_results = await asyncio.gather(*[task for _, task in tasks]) # Process parallel results for (thinking_direction, _), result in zip(tasks, parallel_results, strict=False): content = self._extract_content(result) individual_results[thinking_direction.value] = content logger.info(f" {thinking_direction.value} thinking completed (parallel)") # Create programmatic synthesis (no synthesis agent in triple strategy) final_content = self._synthesize_triple_thinking_results( individual_results, thinking_sequence, thought_data.thought ) return { "final_content": final_content, "individual_results": individual_results, } async def _process_full_direction_sequence( self, thought_data: "ThoughtData", context: str, decision: RoutingDecision ) -> dict[str, Any]: """Process full multi-thinking direction sequence with parallel execution.""" thinking_sequence = decision.strategy.thinking_sequence logger.info( " FULL THINKING SEQUENCE: Initial orchestration -> Parallel processing -> Final synthesis" ) individual_results = {} # Step 1: Initial SYNTHESIS for orchestration (if first agent is SYNTHESIS) if thinking_sequence[0] == ThinkingDirection.SYNTHESIS: logger.info(" Step 1: Initial synthesis orchestration") initial_synthesis_model = self.model_config.create_enhanced_model() logger.info(" Using enhanced model for initial orchestration") initial_synthesis_agent = self.thinking_factory.create_thinking_agent( ThinkingDirection.SYNTHESIS, initial_synthesis_model, context, {} ) initial_result = await initial_synthesis_agent.arun(input=thought_data.thought) initial_content = self._extract_content(initial_result) individual_results["synthesis_initial"] = initial_content logger.info(" Initial orchestration completed") # Step 2: Identify non-synthesis agents for parallel execution non_synthesis_agents = [ direction for direction in thinking_sequence if direction != ThinkingDirection.SYNTHESIS ] if non_synthesis_agents: logger.info(f" Step 2: Parallel execution of {len(non_synthesis_agents)} thinking agents") import asyncio tasks = [] for thinking_direction in non_synthesis_agents: logger.info(f" Preparing {thinking_direction.value} thinking for parallel execution") model = self.model_config.create_standard_model() logger.info(f" Using standard model for {thinking_direction.value} thinking") agent = self.thinking_factory.create_thinking_agent( thinking_direction, model, context, {} ) # All non-synthesis agents receive original input (parallel processing) task = agent.arun(input=thought_data.thought) tasks.append((thinking_direction, task)) # Execute all non-synthesis agents in parallel logger.info(f" Executing {len(tasks)} thinking agents in parallel") parallel_results = await asyncio.gather(*[task for _, task in tasks]) # Process parallel results for (thinking_direction, _), result in zip(tasks, parallel_results, strict=False): content = self._extract_content(result) individual_results[thinking_direction.value] = content logger.info(f" {thinking_direction.value} thinking completed (parallel)") # Step 3: Final SYNTHESIS for integration (if last agent is SYNTHESIS) final_synthesis_agents = [ i for i, direction in enumerate(thinking_sequence) if direction == ThinkingDirection.SYNTHESIS and i > 0 ] if final_synthesis_agents: logger.info(" Step 3: Final synthesis integration") final_synthesis_model = self.model_config.create_enhanced_model() logger.info(" Using enhanced model for final synthesis") # Remove initial synthesis from results for final integration integration_results = { k: v for k, v in individual_results.items() if not k.startswith("synthesis_") } final_synthesis_agent = self.thinking_factory.create_thinking_agent( ThinkingDirection.SYNTHESIS, final_synthesis_model, context, integration_results ) # Build synthesis integration input synthesis_input = self._build_synthesis_integration_input( thought_data.thought, integration_results ) final_result = await final_synthesis_agent.arun(input=synthesis_input) final_content = self._extract_content(final_result) individual_results["synthesis_final"] = final_content logger.info(" Final synthesis integration completed") # Use final synthesis result as the answer final_answer = final_content else: # No final synthesis - create programmatic synthesis final_answer = self._synthesize_full_thinking_results( individual_results, thinking_sequence, thought_data.thought ) return { "final_content": final_answer, "individual_results": individual_results, } def _extract_content(self, result: Any) -> str: """Extract content from agent execution result.""" if hasattr(result, "content"): content = result.content if isinstance(content, str): return content.strip() return str(content).strip() if isinstance(result, str): return result.strip() return str(result).strip() def _build_sequential_input( self, original_thought: str, previous_results: dict[str, str], current_direction: ThinkingDirection, ) -> str: """Build input for sequential processing.""" input_parts = [f"Original thought: {original_thought}", ""] if previous_results: input_parts.append("Previous analysis perspectives:") for direction_name, content in previous_results.items(): # Use generic descriptions instead of specific direction names perspective_name = self._get_generic_perspective_name(direction_name) input_parts.append( f" {perspective_name}: {content[:200]}{'...' if len(content) > 200 else ''}" ) input_parts.append("") # Use generic instruction instead of direction-specific instruction thinking_style = self._get_thinking_style_instruction(current_direction) input_parts.append(f"Now analyze this from a {thinking_style} perspective.") return "\n".join(input_parts) def _build_synthesis_integration_input( self, original_thought: str, all_results: dict[str, str] ) -> str: """Build synthesis integration input.""" input_parts = [ f"Original question: {original_thought}", "", "Collected insights from comprehensive analysis:", ] for direction_name, content in all_results.items(): if ( direction_name != "synthesis" ): # Avoid including previous synthesis results # Completely hide direction concepts, use generic analysis types perspective_name = self._get_generic_perspective_name(direction_name) input_parts.append(f"• {perspective_name}: {content}") input_parts.extend( [ "", "TASK: Synthesize all analysis insights into ONE comprehensive, unified answer.", "REQUIREMENTS:", "1. Provide a single, coherent response directly addressing the original question", "2. Integrate all insights naturally without mentioning different analysis types", "3. Do NOT list or separate different analysis perspectives in your response", "4. Do NOT use section headers or reference any specific analysis methods", "5. Do NOT mention 'direction', 'perspective', 'analysis type', or similar terms", "6. Write as a unified voice providing the final answer", "7. This will be the ONLY response the user sees - make it complete and standalone", "8. Your response should read as if it came from a single, integrated thought process", ] ) return "\n".join(input_parts) def _combine_dual_thinking_results( self, direction1: ThinkingDirection, content1: str, direction2: ThinkingDirection, content2: str, original_thought: str, ) -> str: """Combine dual thinking direction results.""" # If the second is synthesis thinking, return its result directly (should already be synthesized) if direction2 == ThinkingDirection.SYNTHESIS: return content2 # Otherwise create synthesized answer without mentioning analysis methods if ( direction1 == ThinkingDirection.FACTUAL and direction2 == ThinkingDirection.EMOTIONAL ): return f"Regarding '{original_thought}': A comprehensive analysis reveals both objective realities and human emotional responses. {content1.lower()} while also recognizing that {content2.lower()} These complementary insights suggest a balanced approach that considers both factual evidence and human experience." if ( direction1 == ThinkingDirection.CRITICAL and direction2 == ThinkingDirection.OPTIMISTIC ): return f"Considering '{original_thought}': A thorough evaluation identifies both important concerns and significant opportunities. {content1.lower().strip('.')} while also recognizing promising aspects: {content2.lower()} A measured approach would address the concerns while pursuing the benefits." # Generic synthesis, completely hiding analysis structure return f"Analyzing '{original_thought}': A comprehensive evaluation reveals multiple important insights. {content1.lower().strip('.')} Additionally, {content2.lower()} Integrating these findings provides a well-rounded understanding that addresses the question from multiple angles." def _synthesize_triple_thinking_results( self, results: dict[str, str], thinking_sequence: list[ThinkingDirection], original_thought: str, ) -> str: """Synthesize triple thinking direction results.""" # Create truly synthesized answer, hiding all analysis structure content_pieces = [] for thinking_direction in thinking_sequence: direction_name = thinking_direction.value content = results.get(direction_name, "") if content: # Extract core insights, completely hiding sources clean_content = content.strip().rstrip(".!") content_pieces.append(clean_content) if len(content_pieces) >= 3: # Synthesis of three or more perspectives, completely unified return f"""Considering the question '{original_thought}', a comprehensive analysis reveals several crucial insights. {content_pieces[0].lower()}, which establishes the foundation for understanding. This leads to recognizing that {content_pieces[1].lower()}, adding essential depth to our comprehension. Furthermore, {content_pieces[2].lower() if len(content_pieces) > 2 else ""} Drawing these insights together, the answer emerges as a unified understanding that acknowledges the full complexity while providing clear guidance.""" if len(content_pieces) == 2: return f"Addressing '{original_thought}': A thorough evaluation shows that {content_pieces[0].lower()}, and importantly, {content_pieces[1].lower()} Together, these insights form a comprehensive understanding." if len(content_pieces) == 1: return f"Regarding '{original_thought}': {content_pieces[0]}" return f"After comprehensive consideration of '{original_thought}', the analysis suggests this question merits deeper exploration to provide a complete answer." def _synthesize_full_thinking_results( self, results: dict[str, str], thinking_sequence: list[ThinkingDirection], original_thought: str, ) -> str: """Synthesize full multi-thinking results.""" # If there's a synthesis result, use it preferentially synthesis_result = results.get("synthesis") if synthesis_result: return synthesis_result # Otherwise create synthesis return self._synthesize_triple_thinking_results( results, thinking_sequence, original_thought ) def _get_thinking_contribution(self, thinking_direction: ThinkingDirection) -> str: """Get thinking direction contribution description.""" contributions = { ThinkingDirection.FACTUAL: "factual information and objective data", ThinkingDirection.EMOTIONAL: "emotional insights and intuitive responses", ThinkingDirection.CRITICAL: "critical analysis and risk assessment", ThinkingDirection.OPTIMISTIC: "positive possibilities and value identification", ThinkingDirection.CREATIVE: "creative alternatives and innovative solutions", ThinkingDirection.SYNTHESIS: "process management and integrated thinking", } return contributions.get(thinking_direction, "specialized thinking") def _get_generic_perspective_name(self, direction_name: str) -> str: """Get generic analysis type name for thinking direction, hiding direction concepts.""" name_mapping = { "factual": "Factual analysis", "emotional": "Emotional considerations", "critical": "Risk assessment", "optimistic": "Opportunity analysis", "creative": "Creative exploration", "synthesis": "Strategic synthesis", } return name_mapping.get(direction_name.lower(), "Analysis") def _get_thinking_style_instruction( self, thinking_direction: ThinkingDirection ) -> str: """Get thinking style instruction, avoiding mention of direction concepts.""" style_mapping = { ThinkingDirection.FACTUAL: "factual and objective", ThinkingDirection.EMOTIONAL: "emotional and intuitive", ThinkingDirection.CRITICAL: "critical and cautious", ThinkingDirection.OPTIMISTIC: "positive and optimistic", ThinkingDirection.CREATIVE: "creative and innovative", ThinkingDirection.SYNTHESIS: "strategic and integrative", } return style_mapping.get(thinking_direction, "analytical") # Create global processor instance _multi_thinking_processor = MultiThinkingSequentialProcessor() # Convenience function async def process_with_multi_thinking( thought_data: "ThoughtData", context: str = "" ) -> MultiThinkingProcessingResult: """Convenience function for processing thoughts using multi-thinking directions.""" return await _multi_thinking_processor.process_with_multi_thinking( thought_data, context ) def create_multi_thinking_step_output( result: MultiThinkingProcessingResult, ) -> StepOutput: """Convert multi-thinking processing result to Agno StepOutput.""" return StepOutput( content=result.content, success=True, step_name=result.step_name, ) ```