This is page 18 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/tools/secaudit.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | SECAUDIT Workflow tool - Comprehensive security audit with systematic investigation
3 |
4 | This tool provides a structured workflow for comprehensive security assessment and analysis.
5 | It guides the CLI agent through systematic investigation steps with forced pauses between each step
6 | to ensure thorough security examination, vulnerability identification, and compliance assessment
7 | before proceeding. The tool supports complex security scenarios including OWASP Top 10 coverage,
8 | compliance framework mapping, and technology-specific security patterns.
9 |
10 | Key features:
11 | - Step-by-step security audit workflow with progress tracking
12 | - Context-aware file embedding (references during investigation, full content for analysis)
13 | - Automatic security issue tracking with severity classification
14 | - Expert analysis integration with external models
15 | - Support for focused security audits (OWASP, compliance, technology-specific)
16 | - Confidence-based workflow optimization
17 | - Risk-based prioritization and remediation planning
18 | """
19 |
20 | import logging
21 | from typing import TYPE_CHECKING, Any, Literal, Optional
22 |
23 | from pydantic import Field, model_validator
24 |
25 | if TYPE_CHECKING:
26 | from tools.models import ToolModelCategory
27 |
28 | from config import TEMPERATURE_ANALYTICAL
29 | from systemprompts import SECAUDIT_PROMPT
30 | from tools.shared.base_models import WorkflowRequest
31 |
32 | from .workflow.base import WorkflowTool
33 |
34 | logger = logging.getLogger(__name__)
35 |
36 | # Tool-specific field descriptions for security audit workflow
37 | SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS = {
38 | "step": (
39 | "Step 1: outline the audit strategy (OWASP Top 10, auth, validation, etc.). Later steps: report findings. MANDATORY: use `relevant_files` for code references and avoid large snippets."
40 | ),
41 | "step_number": "Current security-audit step number (starts at 1).",
42 | "total_steps": "Expected number of audit steps; adjust as new risks surface.",
43 | "next_step_required": "True while additional threat analysis remains; set False once you are ready to hand off for validation.",
44 | "findings": "Summarize vulnerabilities, auth issues, validation gaps, compliance notes, and positives; update prior findings as needed.",
45 | "files_checked": "Absolute paths for every file inspected, including rejected candidates.",
46 | "relevant_files": "Absolute paths for security-relevant files (auth modules, configs, sensitive code).",
47 | "relevant_context": "Security-critical classes/methods (e.g. 'AuthService.login', 'encryption_helper').",
48 | "issues_found": "Security issues with severity (critical/high/medium/low) and descriptions (vulns, auth flaws, injection, crypto, config).",
49 | "confidence": "exploring/low/medium/high/very_high/almost_certain/certain. 'certain' blocks external validation—use only when fully complete.",
50 | "images": "Optional absolute paths to diagrams or threat models that inform the audit.",
51 | "security_scope": "Security context (web, mobile, API, cloud, etc.) including stack, user types, data sensitivity, and threat landscape.",
52 | "threat_level": "Assess the threat level: low (internal/low-risk), medium (customer-facing/business data), high (regulated or sensitive), critical (financial/healthcare/PII).",
53 | "compliance_requirements": "Applicable compliance frameworks or standards (SOC2, PCI DSS, HIPAA, GDPR, ISO 27001, NIST, etc.).",
54 | "audit_focus": "Primary focus area: owasp, compliance, infrastructure, dependencies, or comprehensive.",
55 | "severity_filter": "Minimum severity to include when reporting security issues.",
56 | }
57 |
58 |
59 | class SecauditRequest(WorkflowRequest):
60 | """Request model for security audit workflow investigation steps"""
61 |
62 | # Required fields for each investigation step
63 | step: str = Field(..., description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["step"])
64 | step_number: int = Field(..., description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
65 | total_steps: int = Field(..., description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
66 | next_step_required: bool = Field(..., description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
67 |
68 | # Investigation tracking fields
69 | findings: str = Field(..., description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
70 | files_checked: list[str] = Field(
71 | default_factory=list, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
72 | )
73 | relevant_files: list[str] = Field(
74 | default_factory=list, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
75 | )
76 | relevant_context: list[str] = Field(
77 | default_factory=list, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
78 | )
79 | issues_found: list[dict] = Field(
80 | default_factory=list, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["issues_found"]
81 | )
82 | confidence: Optional[str] = Field("low", description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["confidence"])
83 |
84 | # Optional images for visual context
85 | images: Optional[list[str]] = Field(default=None, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["images"])
86 |
87 | # Security audit-specific fields
88 | security_scope: Optional[str] = Field(None, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["security_scope"])
89 | threat_level: Optional[Literal["low", "medium", "high", "critical"]] = Field(
90 | "medium", description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["threat_level"]
91 | )
92 | compliance_requirements: Optional[list[str]] = Field(
93 | default_factory=list, description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["compliance_requirements"]
94 | )
95 | audit_focus: Optional[Literal["owasp", "compliance", "infrastructure", "dependencies", "comprehensive"]] = Field(
96 | "comprehensive", description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["audit_focus"]
97 | )
98 | severity_filter: Optional[Literal["critical", "high", "medium", "low", "all"]] = Field(
99 | "all", description=SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["severity_filter"]
100 | )
101 |
102 | @model_validator(mode="after")
103 | def validate_security_audit_request(self):
104 | """Validate security audit request parameters"""
105 | # Ensure security scope is provided for comprehensive audits
106 | if self.step_number == 1 and not self.security_scope:
107 | logger.warning("Security scope not provided for security audit - defaulting to general application")
108 |
109 | # Validate compliance requirements format
110 | if self.compliance_requirements:
111 | valid_compliance = {"SOC2", "PCI DSS", "HIPAA", "GDPR", "ISO 27001", "NIST", "FedRAMP", "FISMA"}
112 | for req in self.compliance_requirements:
113 | if req not in valid_compliance:
114 | logger.warning(f"Unknown compliance requirement: {req}")
115 |
116 | return self
117 |
118 |
119 | class SecauditTool(WorkflowTool):
120 | """
121 | Comprehensive security audit workflow tool.
122 |
123 | Provides systematic security assessment through multi-step investigation
124 | covering OWASP Top 10, compliance requirements, and technology-specific
125 | security patterns. Follows established WorkflowTool patterns while adding
126 | security-specific capabilities.
127 | """
128 |
129 | def __init__(self):
130 | super().__init__()
131 | self.initial_request = None
132 | self.security_config = {}
133 |
134 | def get_name(self) -> str:
135 | """Return the unique name of the tool."""
136 | return "secaudit"
137 |
138 | def get_description(self) -> str:
139 | """Return a description of the tool."""
140 | return (
141 | "Performs comprehensive security audit with systematic vulnerability assessment. "
142 | "Use for OWASP Top 10 analysis, compliance evaluation, threat modeling, and security architecture review. "
143 | "Guides through structured security investigation with expert validation."
144 | )
145 |
146 | def get_system_prompt(self) -> str:
147 | """Return the system prompt for expert security analysis."""
148 | return SECAUDIT_PROMPT
149 |
150 | def get_default_temperature(self) -> float:
151 | """Return the temperature for security audit analysis"""
152 | return TEMPERATURE_ANALYTICAL
153 |
154 | def get_model_category(self) -> "ToolModelCategory":
155 | """Return the model category for security audit"""
156 | from tools.models import ToolModelCategory
157 |
158 | return ToolModelCategory.EXTENDED_REASONING
159 |
160 | def get_workflow_request_model(self) -> type:
161 | """Return the workflow request model class"""
162 | return SecauditRequest
163 |
164 | def get_tool_fields(self) -> dict[str, dict[str, Any]]:
165 | """
166 | Get security audit tool field definitions.
167 |
168 | Returns comprehensive field definitions including security-specific
169 | parameters while maintaining compatibility with existing workflow patterns.
170 | """
171 | return SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS
172 |
173 | def get_required_actions(
174 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
175 | ) -> list[str]:
176 | """
177 | Provide step-specific guidance for systematic security analysis.
178 |
179 | Each step focuses on specific security domains to ensure comprehensive
180 | coverage without missing critical security aspects.
181 | """
182 | if step_number == 1:
183 | return [
184 | "Identify application type, technology stack, and security scope",
185 | "Map attack surface, entry points, and data flows",
186 | "Determine relevant security standards and compliance requirements",
187 | "Establish threat landscape and risk context for the application",
188 | ]
189 | elif step_number == 2:
190 | return [
191 | "Analyze authentication mechanisms and session management",
192 | "Check authorization controls, access patterns, and privilege escalation risks",
193 | "Assess multi-factor authentication, password policies, and account security",
194 | "Review identity and access management implementations",
195 | ]
196 | elif step_number == 3:
197 | return [
198 | "Examine input validation and sanitization mechanisms across all entry points",
199 | "Check for injection vulnerabilities (SQL, XSS, Command, LDAP, NoSQL)",
200 | "Review data encryption, sensitive data handling, and cryptographic implementations",
201 | "Analyze API input validation, rate limiting, and request/response security",
202 | ]
203 | elif step_number == 4:
204 | return [
205 | "Conduct OWASP Top 10 (2021) systematic review across all categories",
206 | "Check each OWASP category methodically with specific findings and evidence",
207 | "Cross-reference findings with application context and technology stack",
208 | "Prioritize vulnerabilities based on exploitability and business impact",
209 | ]
210 | elif step_number == 5:
211 | return [
212 | "Analyze third-party dependencies for known vulnerabilities and outdated versions",
213 | "Review configuration security, default settings, and hardening measures",
214 | "Check for hardcoded secrets, credentials, and sensitive information exposure",
215 | "Assess logging, monitoring, incident response, and security observability",
216 | ]
217 | elif step_number == 6:
218 | return [
219 | "Evaluate compliance requirements and identify gaps in controls",
220 | "Assess business impact and risk levels of all identified findings",
221 | "Create prioritized remediation roadmap with timeline and effort estimates",
222 | "Document comprehensive security posture and recommendations",
223 | ]
224 | else:
225 | return [
226 | "Continue systematic security investigation based on emerging findings",
227 | "Deep-dive into specific security concerns identified in previous steps",
228 | "Validate security hypotheses and confirm vulnerability assessments",
229 | ]
230 |
231 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
232 | """
233 | Determine when to call expert security analysis.
234 |
235 | Expert analysis is triggered when the security audit has meaningful findings
236 | unless the user requested to skip assistant model.
237 | """
238 | # Check if user requested to skip assistant model
239 | if request and not self.get_request_use_assistant_model(request):
240 | return False
241 |
242 | # Check if we have meaningful investigation data
243 | return (
244 | len(consolidated_findings.relevant_files) > 0
245 | or len(consolidated_findings.findings) >= 2
246 | or len(consolidated_findings.issues_found) > 0
247 | )
248 |
249 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
250 | """
251 | Prepare comprehensive context for expert security model analysis.
252 |
253 | Provides security-specific context including scope, threat level,
254 | compliance requirements, and systematic findings for expert validation.
255 | """
256 | context_parts = [
257 | f"=== SECURITY AUDIT REQUEST ===\n{self.initial_request or 'Security audit workflow initiated'}\n=== END REQUEST ==="
258 | ]
259 |
260 | # Add investigation summary
261 | investigation_summary = self._build_security_audit_summary(consolidated_findings)
262 | context_parts.append(
263 | f"\n=== AGENT'S SECURITY INVESTIGATION ===\n{investigation_summary}\n=== END INVESTIGATION ==="
264 | )
265 |
266 | # Add security configuration context if available
267 | if self.security_config:
268 | config_text = "\n".join(f"- {key}: {value}" for key, value in self.security_config.items() if value)
269 | context_parts.append(f"\n=== SECURITY CONFIGURATION ===\n{config_text}\n=== END CONFIGURATION ===")
270 |
271 | # Add relevant files if available
272 | if consolidated_findings.relevant_files:
273 | files_text = "\n".join(f"- {file}" for file in consolidated_findings.relevant_files)
274 | context_parts.append(f"\n=== RELEVANT FILES ===\n{files_text}\n=== END FILES ===")
275 |
276 | # Add relevant security elements if available
277 | if consolidated_findings.relevant_context:
278 | methods_text = "\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
279 | context_parts.append(
280 | f"\n=== SECURITY-CRITICAL CODE ELEMENTS ===\n{methods_text}\n=== END CODE ELEMENTS ==="
281 | )
282 |
283 | # Add security issues found if available
284 | if consolidated_findings.issues_found:
285 | issues_text = self._format_security_issues(consolidated_findings.issues_found)
286 | context_parts.append(f"\n=== SECURITY ISSUES IDENTIFIED ===\n{issues_text}\n=== END ISSUES ===")
287 |
288 | # Add assessment evolution if available
289 | if consolidated_findings.hypotheses:
290 | assessments_text = "\n".join(
291 | f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
292 | for h in consolidated_findings.hypotheses
293 | )
294 | context_parts.append(f"\n=== ASSESSMENT EVOLUTION ===\n{assessments_text}\n=== END ASSESSMENTS ===")
295 |
296 | # Add images if available
297 | if consolidated_findings.images:
298 | images_text = "\n".join(f"- {img}" for img in consolidated_findings.images)
299 | context_parts.append(
300 | f"\n=== VISUAL SECURITY INFORMATION ===\n{images_text}\n=== END VISUAL INFORMATION ==="
301 | )
302 |
303 | return "\n".join(context_parts)
304 |
305 | def _format_security_issues(self, issues_found: list[dict]) -> str:
306 | """
307 | Format security issues for expert analysis.
308 |
309 | Organizes security findings by severity for clear expert review.
310 | """
311 | if not issues_found:
312 | return "No security issues identified during systematic investigation."
313 |
314 | # Group issues by severity
315 | severity_groups = {"critical": [], "high": [], "medium": [], "low": []}
316 |
317 | for issue in issues_found:
318 | severity = issue.get("severity", "low").lower()
319 | description = issue.get("description", "No description provided")
320 | if severity in severity_groups:
321 | severity_groups[severity].append(description)
322 | else:
323 | severity_groups["low"].append(f"[{severity.upper()}] {description}")
324 |
325 | formatted_issues = []
326 | for severity in ["critical", "high", "medium", "low"]:
327 | if severity_groups[severity]:
328 | formatted_issues.append(f"\n{severity.upper()} SEVERITY:")
329 | for issue in severity_groups[severity]:
330 | formatted_issues.append(f" • {issue}")
331 |
332 | return "\n".join(formatted_issues) if formatted_issues else "No security issues identified."
333 |
334 | def _build_security_audit_summary(self, consolidated_findings) -> str:
335 | """Prepare a comprehensive summary of the security audit investigation."""
336 | summary_parts = [
337 | "=== SYSTEMATIC SECURITY AUDIT INVESTIGATION SUMMARY ===",
338 | f"Total steps: {len(consolidated_findings.findings)}",
339 | f"Files examined: {len(consolidated_findings.files_checked)}",
340 | f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
341 | f"Security-critical elements analyzed: {len(consolidated_findings.relevant_context)}",
342 | f"Security issues identified: {len(consolidated_findings.issues_found)}",
343 | "",
344 | "=== INVESTIGATION PROGRESSION ===",
345 | ]
346 |
347 | for finding in consolidated_findings.findings:
348 | summary_parts.append(finding)
349 |
350 | return "\n".join(summary_parts)
351 |
352 | def get_input_schema(self) -> dict[str, Any]:
353 | """Generate input schema using WorkflowSchemaBuilder with security audit-specific overrides."""
354 | from .workflow.schema_builders import WorkflowSchemaBuilder
355 |
356 | # Security audit workflow-specific field overrides
357 | secaudit_field_overrides = {
358 | "step": {
359 | "type": "string",
360 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["step"],
361 | },
362 | "step_number": {
363 | "type": "integer",
364 | "minimum": 1,
365 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
366 | },
367 | "total_steps": {
368 | "type": "integer",
369 | "minimum": 1,
370 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
371 | },
372 | "next_step_required": {
373 | "type": "boolean",
374 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
375 | },
376 | "findings": {
377 | "type": "string",
378 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
379 | },
380 | "files_checked": {
381 | "type": "array",
382 | "items": {"type": "string"},
383 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
384 | },
385 | "relevant_files": {
386 | "type": "array",
387 | "items": {"type": "string"},
388 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
389 | },
390 | "confidence": {
391 | "type": "string",
392 | "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
393 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
394 | },
395 | "issues_found": {
396 | "type": "array",
397 | "items": {"type": "object"},
398 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["issues_found"],
399 | },
400 | "images": {
401 | "type": "array",
402 | "items": {"type": "string"},
403 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["images"],
404 | },
405 | # Security audit-specific fields (for step 1)
406 | "security_scope": {
407 | "type": "string",
408 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["security_scope"],
409 | },
410 | "threat_level": {
411 | "type": "string",
412 | "enum": ["low", "medium", "high", "critical"],
413 | "default": "medium",
414 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["threat_level"],
415 | },
416 | "compliance_requirements": {
417 | "type": "array",
418 | "items": {"type": "string"},
419 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["compliance_requirements"],
420 | },
421 | "audit_focus": {
422 | "type": "string",
423 | "enum": ["owasp", "compliance", "infrastructure", "dependencies", "comprehensive"],
424 | "default": "comprehensive",
425 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["audit_focus"],
426 | },
427 | "severity_filter": {
428 | "type": "string",
429 | "enum": ["critical", "high", "medium", "low", "all"],
430 | "default": "all",
431 | "description": SECAUDIT_WORKFLOW_FIELD_DESCRIPTIONS["severity_filter"],
432 | },
433 | }
434 |
435 | # Use WorkflowSchemaBuilder with security audit-specific tool fields
436 | return WorkflowSchemaBuilder.build_schema(
437 | tool_specific_fields=secaudit_field_overrides,
438 | model_field_schema=self.get_model_field_schema(),
439 | auto_mode=self.is_effective_auto_mode(),
440 | tool_name=self.get_name(),
441 | )
442 |
443 | # Hook method overrides for security audit-specific behavior
444 |
445 | def prepare_step_data(self, request) -> dict:
446 | """Map security audit-specific fields for internal processing."""
447 | step_data = {
448 | "step": request.step,
449 | "step_number": request.step_number,
450 | "findings": request.findings,
451 | "files_checked": request.files_checked,
452 | "relevant_files": request.relevant_files,
453 | "relevant_context": request.relevant_context,
454 | "issues_found": request.issues_found,
455 | "confidence": request.confidence,
456 | "hypothesis": request.findings, # Map findings to hypothesis for compatibility
457 | "images": request.images or [],
458 | }
459 |
460 | # Store security-specific configuration on first step
461 | if request.step_number == 1:
462 | self.security_config = {
463 | "security_scope": request.security_scope,
464 | "threat_level": request.threat_level,
465 | "compliance_requirements": request.compliance_requirements,
466 | "audit_focus": request.audit_focus,
467 | "severity_filter": request.severity_filter,
468 | }
469 |
470 | return step_data
471 |
472 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
473 | """Security audit workflow skips expert analysis when the CLI agent has "certain" confidence."""
474 | return request.confidence == "certain" and not request.next_step_required
475 |
476 | def store_initial_issue(self, step_description: str):
477 | """Store initial request for expert analysis."""
478 | self.initial_request = step_description
479 |
480 | def should_include_files_in_expert_prompt(self) -> bool:
481 | """Include files in expert analysis for comprehensive security audit."""
482 | return True
483 |
484 | def should_embed_system_prompt(self) -> bool:
485 | """Embed system prompt in expert analysis for proper context."""
486 | return True
487 |
488 | def get_expert_thinking_mode(self) -> str:
489 | """Use high thinking mode for thorough security analysis."""
490 | return "high"
491 |
492 | def get_expert_analysis_instruction(self) -> str:
493 | """Get specific instruction for security audit expert analysis."""
494 | return (
495 | "Please provide comprehensive security analysis based on the investigation findings. "
496 | "Focus on identifying any remaining vulnerabilities, validating the completeness of the analysis, "
497 | "and providing final recommendations for security improvements, following the OWASP-based "
498 | "format specified in the system prompt."
499 | )
500 |
501 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
502 | """
503 | Security audit-specific completion message.
504 | """
505 | base_message = (
506 | "SECURITY AUDIT IS COMPLETE. You MUST now summarize and present ALL security findings organized by "
507 | "severity (Critical → High → Medium → Low), specific code locations with line numbers, and exact "
508 | "remediation steps for each vulnerability. Clearly prioritize the top 3 security issues that need "
509 | "immediate attention. Provide concrete, actionable guidance for each vulnerability—make it easy for "
510 | "developers to understand exactly what needs to be fixed and how to implement the security improvements."
511 | )
512 |
513 | # Add expert analysis guidance only when expert analysis was actually used
514 | if expert_analysis_used:
515 | expert_guidance = self.get_expert_analysis_guidance()
516 | if expert_guidance:
517 | return f"{base_message}\n\n{expert_guidance}"
518 |
519 | return base_message
520 |
521 | def get_expert_analysis_guidance(self) -> str:
522 | """
523 | Provide specific guidance for handling expert analysis in security audits.
524 | """
525 | return (
526 | "IMPORTANT: Analysis from an assistant model has been provided above. You MUST critically evaluate and validate "
527 | "the expert security findings rather than accepting them blindly. Cross-reference the expert analysis with "
528 | "your own investigation findings, verify that suggested security improvements are appropriate for this "
529 | "application's context and threat model, and ensure recommendations align with the project's security requirements. "
530 | "Present a synthesis that combines your systematic security review with validated expert insights, clearly "
531 | "distinguishing between vulnerabilities you've independently confirmed and additional insights from expert analysis."
532 | )
533 |
534 | def get_step_guidance_message(self, request) -> str:
535 | """
536 | Security audit-specific step guidance with detailed investigation instructions.
537 | """
538 | step_guidance = self.get_security_audit_step_guidance(request.step_number, request.confidence, request)
539 | return step_guidance["next_steps"]
540 |
541 | def get_security_audit_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
542 | """
543 | Provide step-specific guidance for security audit workflow.
544 | """
545 | # Generate the next steps instruction based on required actions
546 | required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
547 |
548 | if step_number == 1:
549 | next_steps = (
550 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
551 | f"the code files thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
552 | f"the security landscape, identify potential vulnerabilities across OWASP Top 10 categories, "
553 | f"and look for authentication flaws, injection points, cryptographic issues, and authorization bypasses. "
554 | f"Use file reading tools, security analysis, and systematic examination to gather comprehensive information. "
555 | f"Only call {self.get_name()} again AFTER completing your security investigation. When you call "
556 | f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
557 | f"files examined, vulnerabilities found, and security assessments discovered."
558 | )
559 | elif confidence in ["exploring", "low"]:
560 | next_steps = (
561 | f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
562 | f"deeper security analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
563 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
564 | + f"\n\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
565 | + "completing these security audit tasks."
566 | )
567 | elif confidence in ["medium", "high"]:
568 | next_steps = (
569 | f"WAIT! Your security audit needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\n"
570 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
571 | + f"\n\nREMEMBER: Ensure you have identified all significant vulnerabilities across all severity levels and "
572 | f"verified the completeness of your security review. Document findings with specific file references and "
573 | f"line numbers where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
574 | )
575 | else:
576 | next_steps = (
577 | f"PAUSE SECURITY AUDIT. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
578 | + "Required: "
579 | + ", ".join(required_actions[:2])
580 | + ". "
581 | + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
582 | f"NEW evidence from actual security analysis, not just theories. NO recursive {self.get_name()} calls "
583 | f"without investigation work!"
584 | )
585 |
586 | return {"next_steps": next_steps}
587 |
588 | def customize_workflow_response(self, response_data: dict, request) -> dict:
589 | """
590 | Customize response to match security audit workflow format.
591 | """
592 | # Store initial request on first step
593 | if request.step_number == 1:
594 | self.initial_request = request.step
595 | # Store security configuration for expert analysis
596 | if request.relevant_files:
597 | self.security_config = {
598 | "relevant_files": request.relevant_files,
599 | "security_scope": request.security_scope,
600 | "threat_level": request.threat_level,
601 | "compliance_requirements": request.compliance_requirements,
602 | "audit_focus": request.audit_focus,
603 | "severity_filter": request.severity_filter,
604 | }
605 |
606 | # Convert generic status names to security audit-specific ones
607 | tool_name = self.get_name()
608 | status_mapping = {
609 | f"{tool_name}_in_progress": "security_audit_in_progress",
610 | f"pause_for_{tool_name}": "pause_for_security_audit",
611 | f"{tool_name}_required": "security_audit_required",
612 | f"{tool_name}_complete": "security_audit_complete",
613 | }
614 |
615 | if response_data["status"] in status_mapping:
616 | response_data["status"] = status_mapping[response_data["status"]]
617 |
618 | # Rename status field to match security audit workflow
619 | if f"{tool_name}_status" in response_data:
620 | response_data["security_audit_status"] = response_data.pop(f"{tool_name}_status")
621 | # Add security audit-specific status fields
622 | response_data["security_audit_status"]["vulnerabilities_by_severity"] = {}
623 | for issue in self.consolidated_findings.issues_found:
624 | severity = issue.get("severity", "unknown")
625 | if severity not in response_data["security_audit_status"]["vulnerabilities_by_severity"]:
626 | response_data["security_audit_status"]["vulnerabilities_by_severity"][severity] = 0
627 | response_data["security_audit_status"]["vulnerabilities_by_severity"][severity] += 1
628 | response_data["security_audit_status"]["audit_confidence"] = self.get_request_confidence(request)
629 |
630 | # Map complete_secaudit to complete_security_audit
631 | if f"complete_{tool_name}" in response_data:
632 | response_data["complete_security_audit"] = response_data.pop(f"complete_{tool_name}")
633 |
634 | # Map the completion flag to match security audit workflow
635 | if f"{tool_name}_complete" in response_data:
636 | response_data["security_audit_complete"] = response_data.pop(f"{tool_name}_complete")
637 |
638 | return response_data
639 |
640 | # Override inheritance hooks for security audit-specific behavior
641 |
642 | def get_completion_status(self) -> str:
643 | """Security audit tools use audit-specific status."""
644 | return "security_analysis_complete"
645 |
646 | def get_completion_data_key(self) -> str:
647 | """Security audit uses 'complete_security_audit' key."""
648 | return "complete_security_audit"
649 |
650 | def get_final_analysis_from_request(self, request):
651 | """Security audit tools use 'findings' field."""
652 | return request.findings
653 |
654 | def get_confidence_level(self, request) -> str:
655 | """Security audit tools use 'certain' for high confidence."""
656 | return "certain"
657 |
658 | def get_completion_message(self) -> str:
659 | """Security audit-specific completion message."""
660 | return (
661 | "Security audit complete with CERTAIN confidence. You have identified all significant vulnerabilities "
662 | "and provided comprehensive security analysis. MANDATORY: Present the user with the complete security audit results "
663 | "categorized by severity, and IMMEDIATELY proceed with implementing the highest priority security fixes "
664 | "or provide specific guidance for vulnerability remediation. Focus on actionable security recommendations."
665 | )
666 |
667 | def get_skip_reason(self) -> str:
668 | """Security audit-specific skip reason."""
669 | return "Completed comprehensive security audit with full confidence locally"
670 |
671 | def get_skip_expert_analysis_status(self) -> str:
672 | """Security audit-specific expert analysis skip status."""
673 | return "skipped_due_to_certain_audit_confidence"
674 |
675 | def prepare_work_summary(self) -> str:
676 | """Security audit-specific work summary."""
677 | return self._build_security_audit_summary(self.consolidated_findings)
678 |
679 | def get_request_model(self):
680 | """Return the request model for this tool"""
681 | return SecauditRequest
682 |
683 | async def prepare_prompt(self, request: SecauditRequest) -> str:
684 | """Not used - workflow tools use execute_workflow()."""
685 | return "" # Workflow tools use execute_workflow() directly
686 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_testgen_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | TestGen Tool Validation Test
4 |
5 | Tests the testgen tool's capabilities using the workflow architecture.
6 | This validates that the workflow-based implementation guides Claude through
7 | systematic test generation analysis before creating comprehensive test suites.
8 | """
9 |
10 | import json
11 | from typing import Optional
12 |
13 | from .conversation_base_test import ConversationBaseTest
14 |
15 |
16 | class TestGenValidationTest(ConversationBaseTest):
17 | """Test testgen tool with workflow architecture"""
18 |
19 | @property
20 | def test_name(self) -> str:
21 | return "testgen_validation"
22 |
23 | @property
24 | def test_description(self) -> str:
25 | return "TestGen tool validation with step-by-step test planning"
26 |
27 | def run_test(self) -> bool:
28 | """Test testgen tool capabilities"""
29 | # Set up the test environment
30 | self.setUp()
31 |
32 | try:
33 | self.logger.info("Test: TestGen tool validation")
34 |
35 | # Create sample code files to test
36 | self._create_test_code_files()
37 |
38 | # Test 1: Single investigation session with multiple steps
39 | if not self._test_single_test_generation_session():
40 | return False
41 |
42 | # Test 2: Test generation with pattern following
43 | if not self._test_generation_with_pattern_following():
44 | return False
45 |
46 | # Test 3: Complete test generation with expert analysis
47 | if not self._test_complete_generation_with_analysis():
48 | return False
49 |
50 | # Test 4: Certain confidence behavior
51 | if not self._test_certain_confidence():
52 | return False
53 |
54 | # Test 5: Context-aware file embedding
55 | if not self._test_context_aware_file_embedding():
56 | return False
57 |
58 | # Test 6: Multi-step test planning
59 | if not self._test_multi_step_test_planning():
60 | return False
61 |
62 | self.logger.info(" ✅ All testgen validation tests passed")
63 | return True
64 |
65 | except Exception as e:
66 | self.logger.error(f"TestGen validation test failed: {e}")
67 | return False
68 |
69 | def _create_test_code_files(self):
70 | """Create sample code files for test generation"""
71 | # Create a calculator module with various functions
72 | calculator_code = """#!/usr/bin/env python3
73 | \"\"\"
74 | Simple calculator module for demonstration
75 | \"\"\"
76 |
77 | def add(a, b):
78 | \"\"\"Add two numbers\"\"\"
79 | return a + b
80 |
81 | def subtract(a, b):
82 | \"\"\"Subtract b from a\"\"\"
83 | return a - b
84 |
85 | def multiply(a, b):
86 | \"\"\"Multiply two numbers\"\"\"
87 | return a * b
88 |
89 | def divide(a, b):
90 | \"\"\"Divide a by b\"\"\"
91 | if b == 0:
92 | raise ValueError("Cannot divide by zero")
93 | return a / b
94 |
95 | def calculate_percentage(value, percentage):
96 | \"\"\"Calculate percentage of a value\"\"\"
97 | if percentage < 0:
98 | raise ValueError("Percentage cannot be negative")
99 | if percentage > 100:
100 | raise ValueError("Percentage cannot exceed 100")
101 | return (value * percentage) / 100
102 |
103 | def power(base, exponent):
104 | \"\"\"Calculate base raised to exponent\"\"\"
105 | if base == 0 and exponent < 0:
106 | raise ValueError("Cannot raise 0 to negative power")
107 | return base ** exponent
108 | """
109 |
110 | # Create test file
111 | self.calculator_file = self.create_additional_test_file("calculator.py", calculator_code)
112 | self.logger.info(f" ✅ Created calculator module: {self.calculator_file}")
113 |
114 | # Create a simple existing test file to use as pattern
115 | existing_test = """#!/usr/bin/env python3
116 | import pytest
117 | from calculator import add, subtract
118 |
119 | class TestCalculatorBasic:
120 | \"\"\"Test basic calculator operations\"\"\"
121 |
122 | def test_add_positive_numbers(self):
123 | \"\"\"Test adding two positive numbers\"\"\"
124 | assert add(2, 3) == 5
125 | assert add(10, 20) == 30
126 |
127 | def test_add_negative_numbers(self):
128 | \"\"\"Test adding negative numbers\"\"\"
129 | assert add(-5, -3) == -8
130 | assert add(-10, 5) == -5
131 |
132 | def test_subtract_positive(self):
133 | \"\"\"Test subtracting positive numbers\"\"\"
134 | assert subtract(10, 3) == 7
135 | assert subtract(5, 5) == 0
136 | """
137 |
138 | self.existing_test_file = self.create_additional_test_file("test_calculator_basic.py", existing_test)
139 | self.logger.info(f" ✅ Created existing test file: {self.existing_test_file}")
140 |
141 | def _test_single_test_generation_session(self) -> bool:
142 | """Test a complete test generation session with multiple steps"""
143 | try:
144 | self.logger.info(" 1.1: Testing single test generation session")
145 |
146 | # Step 1: Start investigation
147 | self.logger.info(" 1.1.1: Step 1 - Initial test planning")
148 | response1, continuation_id = self.call_mcp_tool(
149 | "testgen",
150 | {
151 | "step": "I need to generate comprehensive tests for the calculator module. Let me start by analyzing the code structure and understanding the functionality.",
152 | "step_number": 1,
153 | "total_steps": 4,
154 | "next_step_required": True,
155 | "findings": "Calculator module contains 6 functions: add, subtract, multiply, divide, calculate_percentage, and power. Each has specific error conditions that need testing.",
156 | "files_checked": [self.calculator_file],
157 | "relevant_files": [self.calculator_file],
158 | "relevant_context": ["add", "subtract", "multiply", "divide", "calculate_percentage", "power"],
159 | },
160 | )
161 |
162 | if not response1 or not continuation_id:
163 | self.logger.error("Failed to get initial test planning response")
164 | return False
165 |
166 | # Parse and validate JSON response
167 | response1_data = self._parse_testgen_response(response1)
168 | if not response1_data:
169 | return False
170 |
171 | # Validate step 1 response structure
172 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_test_analysis"):
173 | return False
174 |
175 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
176 |
177 | # Step 2: Analyze test requirements
178 | self.logger.info(" 1.1.2: Step 2 - Test requirements analysis")
179 | response2, _ = self.call_mcp_tool(
180 | "testgen",
181 | {
182 | "step": "Now analyzing the test requirements for each function, identifying edge cases and boundary conditions.",
183 | "step_number": 2,
184 | "total_steps": 4,
185 | "next_step_required": True,
186 | "findings": "Identified key test scenarios: (1) divide - zero division error, (2) calculate_percentage - negative/over 100 validation, (3) power - zero to negative power error. Need tests for normal cases and edge cases.",
187 | "files_checked": [self.calculator_file],
188 | "relevant_files": [self.calculator_file],
189 | "relevant_context": ["divide", "calculate_percentage", "power"],
190 | "confidence": "medium",
191 | "continuation_id": continuation_id,
192 | },
193 | )
194 |
195 | if not response2:
196 | self.logger.error("Failed to continue test planning to step 2")
197 | return False
198 |
199 | response2_data = self._parse_testgen_response(response2)
200 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_test_analysis"):
201 | return False
202 |
203 | # Check test generation status tracking
204 | test_status = response2_data.get("test_generation_status", {})
205 | if test_status.get("test_scenarios_identified", 0) < 3:
206 | self.logger.error("Test scenarios not properly tracked")
207 | return False
208 |
209 | if test_status.get("analysis_confidence") != "medium":
210 | self.logger.error("Confidence level not properly tracked")
211 | return False
212 |
213 | self.logger.info(" ✅ Step 2 successful with proper tracking")
214 |
215 | # Store continuation_id for next test
216 | self.test_continuation_id = continuation_id
217 | return True
218 |
219 | except Exception as e:
220 | self.logger.error(f"Single test generation session test failed: {e}")
221 | return False
222 |
223 | def _test_generation_with_pattern_following(self) -> bool:
224 | """Test test generation following existing patterns"""
225 | try:
226 | self.logger.info(" 1.2: Testing test generation with pattern following")
227 |
228 | # Start a new investigation with existing test patterns
229 | self.logger.info(" 1.2.1: Start test generation with pattern reference")
230 | response1, continuation_id = self.call_mcp_tool(
231 | "testgen",
232 | {
233 | "step": "Generating tests for remaining calculator functions following existing test patterns",
234 | "step_number": 1,
235 | "total_steps": 3,
236 | "next_step_required": True,
237 | "findings": "Found existing test pattern using pytest with class-based organization and descriptive test names",
238 | "files_checked": [self.calculator_file, self.existing_test_file],
239 | "relevant_files": [self.calculator_file, self.existing_test_file],
240 | "relevant_context": ["TestCalculatorBasic", "multiply", "divide", "calculate_percentage", "power"],
241 | },
242 | )
243 |
244 | if not response1 or not continuation_id:
245 | self.logger.error("Failed to start pattern following test")
246 | return False
247 |
248 | # Step 2: Analyze patterns
249 | self.logger.info(" 1.2.2: Step 2 - Pattern analysis")
250 | response2, _ = self.call_mcp_tool(
251 | "testgen",
252 | {
253 | "step": "Analyzing the existing test patterns to maintain consistency",
254 | "step_number": 2,
255 | "total_steps": 3,
256 | "next_step_required": True,
257 | "findings": "Existing tests use: class-based organization (TestCalculatorBasic), descriptive method names (test_operation_scenario), multiple assertions per test, pytest framework",
258 | "files_checked": [self.existing_test_file],
259 | "relevant_files": [self.calculator_file, self.existing_test_file],
260 | "confidence": "high",
261 | "continuation_id": continuation_id,
262 | },
263 | )
264 |
265 | if not response2:
266 | self.logger.error("Failed to continue to step 2")
267 | return False
268 |
269 | self.logger.info(" ✅ Pattern analysis successful")
270 | return True
271 |
272 | except Exception as e:
273 | self.logger.error(f"Pattern following test failed: {e}")
274 | return False
275 |
276 | def _test_complete_generation_with_analysis(self) -> bool:
277 | """Test complete test generation ending with expert analysis"""
278 | try:
279 | self.logger.info(" 1.3: Testing complete test generation with expert analysis")
280 |
281 | # Use the continuation from first test or start fresh
282 | continuation_id = getattr(self, "test_continuation_id", None)
283 | if not continuation_id:
284 | # Start fresh if no continuation available
285 | self.logger.info(" 1.3.0: Starting fresh test generation")
286 | response0, continuation_id = self.call_mcp_tool(
287 | "testgen",
288 | {
289 | "step": "Analyzing calculator module for comprehensive test generation",
290 | "step_number": 1,
291 | "total_steps": 2,
292 | "next_step_required": True,
293 | "findings": "Identified 6 functions needing tests with various edge cases",
294 | "files_checked": [self.calculator_file],
295 | "relevant_files": [self.calculator_file],
296 | "relevant_context": ["add", "subtract", "multiply", "divide", "calculate_percentage", "power"],
297 | },
298 | )
299 | if not response0 or not continuation_id:
300 | self.logger.error("Failed to start fresh test generation")
301 | return False
302 |
303 | # Final step - trigger expert analysis
304 | self.logger.info(" 1.3.1: Final step - complete test planning")
305 | response_final, _ = self.call_mcp_tool(
306 | "testgen",
307 | {
308 | "step": "Test planning complete. Identified all test scenarios including edge cases, error conditions, and boundary values for comprehensive coverage.",
309 | "step_number": 2,
310 | "total_steps": 2,
311 | "next_step_required": False, # Final step - triggers expert analysis
312 | "findings": "Complete test plan: normal operations, edge cases (zero, negative), error conditions (divide by zero, invalid percentage, zero to negative power), boundary values",
313 | "files_checked": [self.calculator_file],
314 | "relevant_files": [self.calculator_file],
315 | "relevant_context": ["add", "subtract", "multiply", "divide", "calculate_percentage", "power"],
316 | "confidence": "high",
317 | "continuation_id": continuation_id,
318 | "model": "flash", # Use flash for expert analysis
319 | },
320 | )
321 |
322 | if not response_final:
323 | self.logger.error("Failed to complete test generation")
324 | return False
325 |
326 | response_final_data = self._parse_testgen_response(response_final)
327 | if not response_final_data:
328 | return False
329 |
330 | # Validate final response structure
331 | if response_final_data.get("status") != "calling_expert_analysis":
332 | self.logger.error(
333 | f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
334 | )
335 | return False
336 |
337 | if not response_final_data.get("test_generation_complete"):
338 | self.logger.error("Expected test_generation_complete=true for final step")
339 | return False
340 |
341 | # Check for expert analysis
342 | if "expert_analysis" not in response_final_data:
343 | self.logger.error("Missing expert_analysis in final response")
344 | return False
345 |
346 | expert_analysis = response_final_data.get("expert_analysis", {})
347 |
348 | # Check for expected analysis content
349 | analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
350 |
351 | # Look for test generation indicators
352 | test_indicators = ["test", "edge", "boundary", "error", "coverage", "pytest"]
353 | found_indicators = sum(1 for indicator in test_indicators if indicator in analysis_text)
354 |
355 | if found_indicators >= 4:
356 | self.logger.info(" ✅ Expert analysis provided comprehensive test suggestions")
357 | else:
358 | self.logger.warning(
359 | f" ⚠️ Expert analysis may not have fully addressed test generation (found {found_indicators}/6 indicators)"
360 | )
361 |
362 | # Check complete test generation summary
363 | if "complete_test_generation" not in response_final_data:
364 | self.logger.error("Missing complete_test_generation in final response")
365 | return False
366 |
367 | complete_generation = response_final_data["complete_test_generation"]
368 | if not complete_generation.get("relevant_context"):
369 | self.logger.error("Missing relevant context in complete test generation")
370 | return False
371 |
372 | self.logger.info(" ✅ Complete test generation with expert analysis successful")
373 | return True
374 |
375 | except Exception as e:
376 | self.logger.error(f"Complete test generation test failed: {e}")
377 | return False
378 |
379 | def _test_certain_confidence(self) -> bool:
380 | """Test certain confidence behavior - should skip expert analysis"""
381 | try:
382 | self.logger.info(" 1.4: Testing certain confidence behavior")
383 |
384 | # Test certain confidence - should skip expert analysis
385 | self.logger.info(" 1.4.1: Certain confidence test generation")
386 | response_certain, _ = self.call_mcp_tool(
387 | "testgen",
388 | {
389 | "step": "I have fully analyzed the code and identified all test scenarios with 100% certainty. Test plan is complete.",
390 | "step_number": 1,
391 | "total_steps": 1,
392 | "next_step_required": False, # Final step
393 | "findings": "Complete test coverage plan: all functions covered with normal cases, edge cases, and error conditions. Ready for implementation.",
394 | "files_checked": [self.calculator_file],
395 | "relevant_files": [self.calculator_file],
396 | "relevant_context": ["add", "subtract", "multiply", "divide", "calculate_percentage", "power"],
397 | "confidence": "certain", # This should skip expert analysis
398 | "model": "flash",
399 | },
400 | )
401 |
402 | if not response_certain:
403 | self.logger.error("Failed to test certain confidence")
404 | return False
405 |
406 | response_certain_data = self._parse_testgen_response(response_certain)
407 | if not response_certain_data:
408 | return False
409 |
410 | # Validate certain confidence response - should skip expert analysis
411 | if response_certain_data.get("status") != "test_generation_complete_ready_for_implementation":
412 | self.logger.error(
413 | f"Expected status 'test_generation_complete_ready_for_implementation', got '{response_certain_data.get('status')}'"
414 | )
415 | return False
416 |
417 | if not response_certain_data.get("skip_expert_analysis"):
418 | self.logger.error("Expected skip_expert_analysis=true for certain confidence")
419 | return False
420 |
421 | expert_analysis = response_certain_data.get("expert_analysis", {})
422 | if expert_analysis.get("status") != "skipped_due_to_certain_test_confidence":
423 | self.logger.error("Expert analysis should be skipped for certain confidence")
424 | return False
425 |
426 | self.logger.info(" ✅ Certain confidence behavior working correctly")
427 | return True
428 |
429 | except Exception as e:
430 | self.logger.error(f"Certain confidence test failed: {e}")
431 | return False
432 |
433 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
434 | """Call an MCP tool in-process - override for testgen-specific response handling"""
435 | # Use in-process implementation to maintain conversation memory
436 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
437 |
438 | if not response_text:
439 | return None, None
440 |
441 | # Extract continuation_id from testgen response specifically
442 | continuation_id = self._extract_testgen_continuation_id(response_text)
443 |
444 | return response_text, continuation_id
445 |
446 | def _extract_testgen_continuation_id(self, response_text: str) -> Optional[str]:
447 | """Extract continuation_id from testgen response"""
448 | try:
449 | # Parse the response
450 | response_data = json.loads(response_text)
451 | return response_data.get("continuation_id")
452 |
453 | except json.JSONDecodeError as e:
454 | self.logger.debug(f"Failed to parse response for testgen continuation_id: {e}")
455 | return None
456 |
457 | def _parse_testgen_response(self, response_text: str) -> dict:
458 | """Parse testgen tool JSON response"""
459 | try:
460 | # Parse the response - it should be direct JSON
461 | return json.loads(response_text)
462 |
463 | except json.JSONDecodeError as e:
464 | self.logger.error(f"Failed to parse testgen response as JSON: {e}")
465 | self.logger.error(f"Response text: {response_text[:500]}...")
466 | return {}
467 |
468 | def _validate_step_response(
469 | self,
470 | response_data: dict,
471 | expected_step: int,
472 | expected_total: int,
473 | expected_next_required: bool,
474 | expected_status: str,
475 | ) -> bool:
476 | """Validate a test generation step response structure"""
477 | try:
478 | # Check status
479 | if response_data.get("status") != expected_status:
480 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
481 | return False
482 |
483 | # Check step number
484 | if response_data.get("step_number") != expected_step:
485 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
486 | return False
487 |
488 | # Check total steps
489 | if response_data.get("total_steps") != expected_total:
490 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
491 | return False
492 |
493 | # Check next_step_required
494 | if response_data.get("next_step_required") != expected_next_required:
495 | self.logger.error(
496 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
497 | )
498 | return False
499 |
500 | # Check test_generation_status exists
501 | if "test_generation_status" not in response_data:
502 | self.logger.error("Missing test_generation_status in response")
503 | return False
504 |
505 | # Check next_steps guidance
506 | if not response_data.get("next_steps"):
507 | self.logger.error("Missing next_steps guidance in response")
508 | return False
509 |
510 | return True
511 |
512 | except Exception as e:
513 | self.logger.error(f"Error validating step response: {e}")
514 | return False
515 |
516 | def _test_context_aware_file_embedding(self) -> bool:
517 | """Test context-aware file embedding optimization"""
518 | try:
519 | self.logger.info(" 1.5: Testing context-aware file embedding")
520 |
521 | # Create additional test files
522 | utils_code = """#!/usr/bin/env python3
523 | def validate_number(n):
524 | \"\"\"Validate if input is a number\"\"\"
525 | return isinstance(n, (int, float))
526 |
527 | def format_result(result):
528 | \"\"\"Format calculation result\"\"\"
529 | if isinstance(result, float):
530 | return round(result, 2)
531 | return result
532 | """
533 |
534 | math_helpers_code = """#!/usr/bin/env python3
535 | import math
536 |
537 | def factorial(n):
538 | \"\"\"Calculate factorial of n\"\"\"
539 | if n < 0:
540 | raise ValueError("Factorial not defined for negative numbers")
541 | return math.factorial(n)
542 |
543 | def is_prime(n):
544 | \"\"\"Check if number is prime\"\"\"
545 | if n < 2:
546 | return False
547 | for i in range(2, int(n**0.5) + 1):
548 | if n % i == 0:
549 | return False
550 | return True
551 | """
552 |
553 | # Create test files
554 | utils_file = self.create_additional_test_file("utils.py", utils_code)
555 | math_file = self.create_additional_test_file("math_helpers.py", math_helpers_code)
556 |
557 | # Test 1: New conversation, intermediate step - should only reference files
558 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
559 | response1, continuation_id = self.call_mcp_tool(
560 | "testgen",
561 | {
562 | "step": "Starting test generation for utility modules",
563 | "step_number": 1,
564 | "total_steps": 3,
565 | "next_step_required": True, # Intermediate step
566 | "findings": "Initial analysis of utility functions",
567 | "files_checked": [utils_file, math_file],
568 | "relevant_files": [utils_file], # This should be referenced, not embedded
569 | "relevant_context": ["validate_number", "format_result"],
570 | "confidence": "low",
571 | "model": "flash",
572 | },
573 | )
574 |
575 | if not response1 or not continuation_id:
576 | self.logger.error("Failed to start context-aware file embedding test")
577 | return False
578 |
579 | response1_data = self._parse_testgen_response(response1)
580 | if not response1_data:
581 | return False
582 |
583 | # Check file context - should be reference_only for intermediate step
584 | file_context = response1_data.get("file_context", {})
585 | if file_context.get("type") != "reference_only":
586 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
587 | return False
588 |
589 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
590 |
591 | # Test 2: Final step - should embed files for expert analysis
592 | self.logger.info(" 1.5.2: Final step (should embed files)")
593 | response2, _ = self.call_mcp_tool(
594 | "testgen",
595 | {
596 | "step": "Test planning complete - all test scenarios identified",
597 | "step_number": 2,
598 | "total_steps": 2,
599 | "next_step_required": False, # Final step - should embed files
600 | "continuation_id": continuation_id,
601 | "findings": "Complete test plan for all utility functions with edge cases",
602 | "files_checked": [utils_file, math_file],
603 | "relevant_files": [utils_file, math_file], # Should be fully embedded
604 | "relevant_context": ["validate_number", "format_result", "factorial", "is_prime"],
605 | "confidence": "high",
606 | "model": "flash",
607 | },
608 | )
609 |
610 | if not response2:
611 | self.logger.error("Failed to complete to final step")
612 | return False
613 |
614 | response2_data = self._parse_testgen_response(response2)
615 | if not response2_data:
616 | return False
617 |
618 | # Check file context - should be fully_embedded for final step
619 | file_context2 = response2_data.get("file_context", {})
620 | if file_context2.get("type") != "fully_embedded":
621 | self.logger.error(
622 | f"Expected fully_embedded file context for final step, got: {file_context2.get('type')}"
623 | )
624 | return False
625 |
626 | # Verify expert analysis was called for final step
627 | if response2_data.get("status") != "calling_expert_analysis":
628 | self.logger.error("Final step should trigger expert analysis")
629 | return False
630 |
631 | self.logger.info(" ✅ Context-aware file embedding test completed successfully")
632 | return True
633 |
634 | except Exception as e:
635 | self.logger.error(f"Context-aware file embedding test failed: {e}")
636 | return False
637 |
638 | def _test_multi_step_test_planning(self) -> bool:
639 | """Test multi-step test planning with complex code"""
640 | try:
641 | self.logger.info(" 1.6: Testing multi-step test planning")
642 |
643 | # Create a complex class to test
644 | complex_code = """#!/usr/bin/env python3
645 | import asyncio
646 | from typing import List, Dict, Optional
647 |
648 | class DataProcessor:
649 | \"\"\"Complex data processor with async operations\"\"\"
650 |
651 | def __init__(self, batch_size: int = 100):
652 | self.batch_size = batch_size
653 | self.processed_count = 0
654 | self.error_count = 0
655 | self.cache: Dict[str, any] = {}
656 |
657 | async def process_batch(self, items: List[dict]) -> List[dict]:
658 | \"\"\"Process a batch of items asynchronously\"\"\"
659 | if not items:
660 | return []
661 |
662 | if len(items) > self.batch_size:
663 | raise ValueError(f"Batch size {len(items)} exceeds limit {self.batch_size}")
664 |
665 | results = []
666 | for item in items:
667 | try:
668 | result = await self._process_single_item(item)
669 | results.append(result)
670 | self.processed_count += 1
671 | except Exception as e:
672 | self.error_count += 1
673 | results.append({"error": str(e), "item": item})
674 |
675 | return results
676 |
677 | async def _process_single_item(self, item: dict) -> dict:
678 | \"\"\"Process a single item with caching\"\"\"
679 | item_id = item.get('id')
680 | if not item_id:
681 | raise ValueError("Item must have an ID")
682 |
683 | # Check cache
684 | if item_id in self.cache:
685 | return self.cache[item_id]
686 |
687 | # Simulate async processing
688 | await asyncio.sleep(0.01)
689 |
690 | processed = {
691 | 'id': item_id,
692 | 'processed': True,
693 | 'value': item.get('value', 0) * 2
694 | }
695 |
696 | # Cache result
697 | self.cache[item_id] = processed
698 | return processed
699 |
700 | def get_stats(self) -> Dict[str, int]:
701 | \"\"\"Get processing statistics\"\"\"
702 | return {
703 | 'processed': self.processed_count,
704 | 'errors': self.error_count,
705 | 'cache_size': len(self.cache),
706 | 'success_rate': self.processed_count / (self.processed_count + self.error_count) if (self.processed_count + self.error_count) > 0 else 0
707 | }
708 | """
709 |
710 | # Create test file
711 | processor_file = self.create_additional_test_file("data_processor.py", complex_code)
712 |
713 | # Step 1: Start investigation
714 | self.logger.info(" 1.6.1: Step 1 - Start complex test planning")
715 | response1, continuation_id = self.call_mcp_tool(
716 | "testgen",
717 | {
718 | "step": "Analyzing complex DataProcessor class for comprehensive test generation",
719 | "step_number": 1,
720 | "total_steps": 4,
721 | "next_step_required": True,
722 | "findings": "DataProcessor is an async class with caching, error handling, and statistics. Need async test patterns.",
723 | "files_checked": [processor_file],
724 | "relevant_files": [processor_file],
725 | "relevant_context": ["DataProcessor", "process_batch", "_process_single_item", "get_stats"],
726 | "confidence": "low",
727 | "model": "flash",
728 | },
729 | )
730 |
731 | if not response1 or not continuation_id:
732 | self.logger.error("Failed to start multi-step test planning")
733 | return False
734 |
735 | response1_data = self._parse_testgen_response(response1)
736 |
737 | # Validate step 1
738 | file_context1 = response1_data.get("file_context", {})
739 | if file_context1.get("type") != "reference_only":
740 | self.logger.error("Step 1 should use reference_only file context")
741 | return False
742 |
743 | self.logger.info(" ✅ Step 1: Started complex test planning")
744 |
745 | # Step 2: Analyze async patterns
746 | self.logger.info(" 1.6.2: Step 2 - Async pattern analysis")
747 | response2, _ = self.call_mcp_tool(
748 | "testgen",
749 | {
750 | "step": "Analyzing async patterns and edge cases for testing",
751 | "step_number": 2,
752 | "total_steps": 4,
753 | "next_step_required": True,
754 | "continuation_id": continuation_id,
755 | "findings": "Key test areas: async batch processing, cache behavior, error handling, batch size limits, empty items, statistics calculation",
756 | "files_checked": [processor_file],
757 | "relevant_files": [processor_file],
758 | "relevant_context": ["process_batch", "_process_single_item"],
759 | "confidence": "medium",
760 | "model": "flash",
761 | },
762 | )
763 |
764 | if not response2:
765 | self.logger.error("Failed to continue to step 2")
766 | return False
767 |
768 | self.logger.info(" ✅ Step 2: Async patterns analyzed")
769 |
770 | # Step 3: Edge case identification
771 | self.logger.info(" 1.6.3: Step 3 - Edge case identification")
772 | response3, _ = self.call_mcp_tool(
773 | "testgen",
774 | {
775 | "step": "Identifying all edge cases and boundary conditions",
776 | "step_number": 3,
777 | "total_steps": 4,
778 | "next_step_required": True,
779 | "continuation_id": continuation_id,
780 | "findings": "Edge cases: empty batch, oversized batch, items without ID, cache hits/misses, concurrent processing, error accumulation",
781 | "files_checked": [processor_file],
782 | "relevant_files": [processor_file],
783 | "confidence": "high",
784 | "model": "flash",
785 | },
786 | )
787 |
788 | if not response3:
789 | self.logger.error("Failed to continue to step 3")
790 | return False
791 |
792 | self.logger.info(" ✅ Step 3: Edge cases identified")
793 |
794 | # Step 4: Final test plan with expert analysis
795 | self.logger.info(" 1.6.4: Step 4 - Complete test plan")
796 | response4, _ = self.call_mcp_tool(
797 | "testgen",
798 | {
799 | "step": "Test planning complete with comprehensive coverage strategy",
800 | "step_number": 4,
801 | "total_steps": 4,
802 | "next_step_required": False, # Final step
803 | "continuation_id": continuation_id,
804 | "findings": "Complete async test suite plan: unit tests for each method, integration tests for batch processing, edge case coverage, performance tests",
805 | "files_checked": [processor_file],
806 | "relevant_files": [processor_file],
807 | "confidence": "high",
808 | "model": "flash",
809 | },
810 | )
811 |
812 | if not response4:
813 | self.logger.error("Failed to complete to final step")
814 | return False
815 |
816 | response4_data = self._parse_testgen_response(response4)
817 |
818 | # Validate final step
819 | if response4_data.get("status") != "calling_expert_analysis":
820 | self.logger.error("Final step should trigger expert analysis")
821 | return False
822 |
823 | file_context4 = response4_data.get("file_context", {})
824 | if file_context4.get("type") != "fully_embedded":
825 | self.logger.error("Final step should use fully_embedded file context")
826 | return False
827 |
828 | self.logger.info(" ✅ Multi-step test planning completed successfully")
829 | return True
830 |
831 | except Exception as e:
832 | self.logger.error(f"Multi-step test planning test failed: {e}")
833 | return False
834 |
```
--------------------------------------------------------------------------------
/tests/test_model_restrictions.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for model restriction functionality."""
2 |
3 | import os
4 | from unittest.mock import MagicMock, patch
5 |
6 | import pytest
7 |
8 | from providers.gemini import GeminiModelProvider
9 | from providers.openai import OpenAIModelProvider
10 | from providers.shared import ProviderType
11 | from utils.model_restrictions import ModelRestrictionService
12 |
13 |
14 | class TestModelRestrictionService:
15 | """Test cases for ModelRestrictionService."""
16 |
17 | def test_no_restrictions_by_default(self):
18 | """Test that no restrictions exist when env vars are not set."""
19 | with patch.dict(os.environ, {}, clear=True):
20 | service = ModelRestrictionService()
21 |
22 | # Should allow all models
23 | assert service.is_allowed(ProviderType.OPENAI, "o3")
24 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
25 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-pro")
26 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-flash")
27 | assert service.is_allowed(ProviderType.OPENROUTER, "anthropic/claude-opus-4")
28 | assert service.is_allowed(ProviderType.OPENROUTER, "openai/o3")
29 |
30 | # Should have no restrictions
31 | assert not service.has_restrictions(ProviderType.OPENAI)
32 | assert not service.has_restrictions(ProviderType.GOOGLE)
33 | assert not service.has_restrictions(ProviderType.OPENROUTER)
34 |
35 | def test_load_single_model_restriction(self):
36 | """Test loading a single allowed model."""
37 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini"}):
38 | service = ModelRestrictionService()
39 |
40 | # Should only allow o3-mini
41 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
42 | assert not service.is_allowed(ProviderType.OPENAI, "o3")
43 | assert not service.is_allowed(ProviderType.OPENAI, "o4-mini")
44 |
45 | # Google and OpenRouter should have no restrictions
46 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-pro")
47 | assert service.is_allowed(ProviderType.OPENROUTER, "anthropic/claude-opus-4")
48 |
49 | def test_load_multiple_models_restriction(self):
50 | """Test loading multiple allowed models."""
51 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini,o4-mini", "GOOGLE_ALLOWED_MODELS": "flash,pro"}):
52 | # Instantiate providers so alias resolution for allow-lists is available
53 | openai_provider = OpenAIModelProvider(api_key="test-key")
54 | gemini_provider = GeminiModelProvider(api_key="test-key")
55 |
56 | from providers.registry import ModelProviderRegistry
57 |
58 | def fake_get_provider(provider_type, force_new=False):
59 | mapping = {
60 | ProviderType.OPENAI: openai_provider,
61 | ProviderType.GOOGLE: gemini_provider,
62 | }
63 | return mapping.get(provider_type)
64 |
65 | with patch.object(ModelProviderRegistry, "get_provider", side_effect=fake_get_provider):
66 |
67 | service = ModelRestrictionService()
68 |
69 | # Check OpenAI models
70 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
71 | assert service.is_allowed(ProviderType.OPENAI, "o4-mini")
72 | assert not service.is_allowed(ProviderType.OPENAI, "o3")
73 |
74 | # Check Google models
75 | assert service.is_allowed(ProviderType.GOOGLE, "flash")
76 | assert service.is_allowed(ProviderType.GOOGLE, "pro")
77 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-pro")
78 |
79 | def test_case_insensitive_and_whitespace_handling(self):
80 | """Test that model names are case-insensitive and whitespace is trimmed."""
81 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": " O3-MINI , o4-Mini "}):
82 | service = ModelRestrictionService()
83 |
84 | # Should work with any case
85 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
86 | assert service.is_allowed(ProviderType.OPENAI, "O3-MINI")
87 | assert service.is_allowed(ProviderType.OPENAI, "o4-mini")
88 | assert service.is_allowed(ProviderType.OPENAI, "O4-Mini")
89 |
90 | def test_empty_string_allows_all(self):
91 | """Test that empty string allows all models (same as unset)."""
92 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "", "GOOGLE_ALLOWED_MODELS": "flash"}):
93 | service = ModelRestrictionService()
94 |
95 | # OpenAI should allow all models (empty string = no restrictions)
96 | assert service.is_allowed(ProviderType.OPENAI, "o3")
97 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
98 | assert service.is_allowed(ProviderType.OPENAI, "o4-mini")
99 |
100 | # Google should only allow flash (and its resolved name)
101 | assert service.is_allowed(ProviderType.GOOGLE, "flash")
102 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-flash", "flash")
103 | assert not service.is_allowed(ProviderType.GOOGLE, "pro")
104 | assert not service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-pro", "pro")
105 |
106 | def test_filter_models(self):
107 | """Test filtering a list of models based on restrictions."""
108 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini,o4-mini"}):
109 | service = ModelRestrictionService()
110 |
111 | models = ["o3", "o3-mini", "o4-mini", "o3-pro"]
112 | filtered = service.filter_models(ProviderType.OPENAI, models)
113 |
114 | assert filtered == ["o3-mini", "o4-mini"]
115 |
116 | def test_get_allowed_models(self):
117 | """Test getting the set of allowed models."""
118 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini,o4-mini"}):
119 | service = ModelRestrictionService()
120 |
121 | allowed = service.get_allowed_models(ProviderType.OPENAI)
122 | assert allowed == {"o3-mini", "o4-mini"}
123 |
124 | # No restrictions for Google
125 | assert service.get_allowed_models(ProviderType.GOOGLE) is None
126 |
127 | def test_shorthand_names_in_restrictions(self):
128 | """Test that shorthand names work in restrictions."""
129 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o4mini,o3mini", "GOOGLE_ALLOWED_MODELS": "flash,pro"}):
130 | # Instantiate providers so the registry can resolve aliases
131 | OpenAIModelProvider(api_key="test-key")
132 | GeminiModelProvider(api_key="test-key")
133 |
134 | service = ModelRestrictionService()
135 |
136 | # When providers check models, they pass both resolved and original names
137 | # OpenAI: 'o4mini' shorthand allows o4-mini
138 | assert service.is_allowed(ProviderType.OPENAI, "o4-mini", "o4mini") # How providers actually call it
139 | assert service.is_allowed(ProviderType.OPENAI, "o4-mini") # Canonical should also be allowed
140 |
141 | # OpenAI: o3-mini allowed directly
142 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
143 | assert not service.is_allowed(ProviderType.OPENAI, "o3")
144 |
145 | # Google should allow both models via shorthands
146 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-flash", "flash")
147 | assert service.is_allowed(ProviderType.GOOGLE, "gemini-2.5-pro", "pro")
148 |
149 | # Also test that full names work when specified in restrictions
150 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini", "o3mini") # Even with shorthand
151 |
152 | def test_validation_against_known_models(self, caplog):
153 | """Test validation warnings for unknown models."""
154 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini,o4-mimi"}): # Note the typo: o4-mimi
155 | service = ModelRestrictionService()
156 |
157 | # Create mock provider with known models
158 | mock_provider = MagicMock()
159 | mock_provider.MODEL_CAPABILITIES = {
160 | "o3": {"context_window": 200000},
161 | "o3-mini": {"context_window": 200000},
162 | "o4-mini": {"context_window": 200000},
163 | }
164 | mock_provider.list_models.return_value = ["o3", "o3-mini", "o4-mini"]
165 |
166 | provider_instances = {ProviderType.OPENAI: mock_provider}
167 | service.validate_against_known_models(provider_instances)
168 |
169 | # Should have logged a warning about the typo
170 | assert "o4-mimi" in caplog.text
171 | assert "not a recognized" in caplog.text
172 |
173 | def test_openrouter_model_restrictions(self):
174 | """Test OpenRouter model restrictions functionality."""
175 | with patch.dict(os.environ, {"OPENROUTER_ALLOWED_MODELS": "opus,sonnet"}):
176 | service = ModelRestrictionService()
177 |
178 | # Should only allow specified OpenRouter models
179 | assert service.is_allowed(ProviderType.OPENROUTER, "opus")
180 | assert service.is_allowed(ProviderType.OPENROUTER, "sonnet")
181 | assert service.is_allowed(ProviderType.OPENROUTER, "anthropic/claude-opus-4", "opus") # With original name
182 | assert not service.is_allowed(ProviderType.OPENROUTER, "haiku")
183 | assert not service.is_allowed(ProviderType.OPENROUTER, "anthropic/claude-3-haiku")
184 | assert not service.is_allowed(ProviderType.OPENROUTER, "mistral-large")
185 |
186 | # Other providers should have no restrictions
187 | assert service.is_allowed(ProviderType.OPENAI, "o3")
188 | assert service.is_allowed(ProviderType.GOOGLE, "pro")
189 |
190 | # Should have restrictions for OpenRouter
191 | assert service.has_restrictions(ProviderType.OPENROUTER)
192 | assert not service.has_restrictions(ProviderType.OPENAI)
193 | assert not service.has_restrictions(ProviderType.GOOGLE)
194 |
195 | def test_openrouter_filter_models(self):
196 | """Test filtering OpenRouter models based on restrictions."""
197 | with patch.dict(os.environ, {"OPENROUTER_ALLOWED_MODELS": "opus,mistral"}):
198 | service = ModelRestrictionService()
199 |
200 | models = ["opus", "sonnet", "haiku", "mistral", "llama"]
201 | filtered = service.filter_models(ProviderType.OPENROUTER, models)
202 |
203 | assert filtered == ["opus", "mistral"]
204 |
205 | def test_combined_provider_restrictions(self):
206 | """Test that restrictions work correctly when set for multiple providers."""
207 | with patch.dict(
208 | os.environ,
209 | {
210 | "OPENAI_ALLOWED_MODELS": "o3-mini",
211 | "GOOGLE_ALLOWED_MODELS": "flash",
212 | "OPENROUTER_ALLOWED_MODELS": "opus,sonnet",
213 | },
214 | ):
215 | service = ModelRestrictionService()
216 |
217 | # OpenAI restrictions
218 | assert service.is_allowed(ProviderType.OPENAI, "o3-mini")
219 | assert not service.is_allowed(ProviderType.OPENAI, "o3")
220 |
221 | # Google restrictions
222 | assert service.is_allowed(ProviderType.GOOGLE, "flash")
223 | assert not service.is_allowed(ProviderType.GOOGLE, "pro")
224 |
225 | # OpenRouter restrictions
226 | assert service.is_allowed(ProviderType.OPENROUTER, "opus")
227 | assert service.is_allowed(ProviderType.OPENROUTER, "sonnet")
228 | assert not service.is_allowed(ProviderType.OPENROUTER, "haiku")
229 |
230 | # All providers should have restrictions
231 | assert service.has_restrictions(ProviderType.OPENAI)
232 | assert service.has_restrictions(ProviderType.GOOGLE)
233 | assert service.has_restrictions(ProviderType.OPENROUTER)
234 |
235 |
236 | class TestProviderIntegration:
237 | """Test integration with actual providers."""
238 |
239 | @patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini"})
240 | def test_openai_provider_respects_restrictions(self):
241 | """Test that OpenAI provider respects restrictions."""
242 | # Clear any cached restriction service
243 | import utils.model_restrictions
244 |
245 | utils.model_restrictions._restriction_service = None
246 |
247 | provider = OpenAIModelProvider(api_key="test-key")
248 |
249 | # Should validate allowed model
250 | assert provider.validate_model_name("o3-mini")
251 |
252 | # Should not validate disallowed model
253 | assert not provider.validate_model_name("o3")
254 |
255 | # get_capabilities should raise for disallowed model
256 | with pytest.raises(ValueError) as exc_info:
257 | provider.get_capabilities("o3")
258 | assert "not allowed by restriction policy" in str(exc_info.value)
259 |
260 | @patch.dict(os.environ, {"GOOGLE_ALLOWED_MODELS": "gemini-2.5-flash,flash"})
261 | def test_gemini_provider_respects_restrictions(self):
262 | """Test that Gemini provider respects restrictions."""
263 | # Clear any cached restriction service
264 | import utils.model_restrictions
265 |
266 | utils.model_restrictions._restriction_service = None
267 |
268 | provider = GeminiModelProvider(api_key="test-key")
269 |
270 | # Should validate allowed models (both shorthand and full name allowed)
271 | assert provider.validate_model_name("flash")
272 | assert provider.validate_model_name("gemini-2.5-flash")
273 |
274 | # Should not validate disallowed model
275 | assert not provider.validate_model_name("pro")
276 | assert not provider.validate_model_name("gemini-2.5-pro")
277 |
278 | # get_capabilities should raise for disallowed model
279 | with pytest.raises(ValueError) as exc_info:
280 | provider.get_capabilities("pro")
281 | assert "not allowed by restriction policy" in str(exc_info.value)
282 |
283 | @patch.dict(os.environ, {"GOOGLE_ALLOWED_MODELS": "flash"})
284 | def test_gemini_parameter_order_regression_protection(self):
285 | """Test that prevents regression of parameter order bug in is_allowed calls.
286 |
287 | This test specifically catches the bug where parameters were incorrectly
288 | passed as (provider, user_input, resolved_name) instead of
289 | (provider, resolved_name, user_input).
290 |
291 | The bug was subtle because the is_allowed method uses OR logic, so it
292 | worked in most cases by accident. This test creates a scenario where
293 | the parameter order matters.
294 | """
295 | # Clear any cached restriction service
296 | import utils.model_restrictions
297 |
298 | utils.model_restrictions._restriction_service = None
299 |
300 | provider = GeminiModelProvider(api_key="test-key")
301 |
302 | from providers.registry import ModelProviderRegistry
303 |
304 | with patch.object(ModelProviderRegistry, "get_provider", return_value=provider):
305 |
306 | # Test case: Only alias "flash" is allowed, not the full name
307 | # If parameters are in wrong order, this test will catch it
308 |
309 | # Should allow "flash" alias
310 | assert provider.validate_model_name("flash")
311 |
312 | # Should allow getting capabilities for "flash"
313 | capabilities = provider.get_capabilities("flash")
314 | assert capabilities.model_name == "gemini-2.5-flash"
315 |
316 | # Canonical form should also be allowed now that alias is on the allowlist
317 | assert provider.validate_model_name("gemini-2.5-flash")
318 | # Unrelated models remain blocked
319 | assert not provider.validate_model_name("pro")
320 | assert not provider.validate_model_name("gemini-2.5-pro")
321 |
322 | @patch.dict(os.environ, {"GOOGLE_ALLOWED_MODELS": "gemini-2.5-flash"})
323 | def test_gemini_parameter_order_edge_case_full_name_only(self):
324 | """Test parameter order with only full name allowed, not alias.
325 |
326 | This is the reverse scenario - only the full canonical name is allowed,
327 | not the shorthand alias. This tests that the parameter order is correct
328 | when resolving aliases.
329 | """
330 | # Clear any cached restriction service
331 | import utils.model_restrictions
332 |
333 | utils.model_restrictions._restriction_service = None
334 |
335 | provider = GeminiModelProvider(api_key="test-key")
336 |
337 | # Should allow full name
338 | assert provider.validate_model_name("gemini-2.5-flash")
339 |
340 | # Should also allow alias that resolves to allowed full name
341 | # This works because is_allowed checks both resolved_name and original_name
342 | assert provider.validate_model_name("flash")
343 |
344 | # Should not allow "pro" alias
345 | assert not provider.validate_model_name("pro")
346 | assert not provider.validate_model_name("gemini-2.5-pro")
347 |
348 |
349 | class TestCustomProviderOpenRouterRestrictions:
350 | """Test custom provider integration with OpenRouter restrictions."""
351 |
352 | @patch.dict(os.environ, {"OPENROUTER_ALLOWED_MODELS": "opus,sonnet", "OPENROUTER_API_KEY": "test-key"})
353 | def test_custom_provider_respects_openrouter_restrictions(self):
354 | """Test that custom provider correctly defers OpenRouter models to OpenRouter provider."""
355 | # Clear any cached restriction service
356 | import utils.model_restrictions
357 |
358 | utils.model_restrictions._restriction_service = None
359 |
360 | from providers.custom import CustomProvider
361 |
362 | provider = CustomProvider(base_url="http://test.com/v1")
363 |
364 | # CustomProvider should NOT validate OpenRouter models - they should be deferred to OpenRouter
365 | assert not provider.validate_model_name("opus")
366 | assert not provider.validate_model_name("sonnet")
367 | assert not provider.validate_model_name("haiku")
368 |
369 | # Should still validate custom models defined in conf/custom_models.json
370 | assert provider.validate_model_name("local-llama")
371 |
372 | @patch.dict(os.environ, {"OPENROUTER_ALLOWED_MODELS": "opus", "OPENROUTER_API_KEY": "test-key"})
373 | def test_custom_provider_openrouter_capabilities_restrictions(self):
374 | """Test that custom provider's get_capabilities correctly handles OpenRouter models."""
375 | # Clear any cached restriction service
376 | import utils.model_restrictions
377 |
378 | utils.model_restrictions._restriction_service = None
379 |
380 | from providers.custom import CustomProvider
381 |
382 | provider = CustomProvider(base_url="http://test.com/v1")
383 |
384 | # For OpenRouter models, CustomProvider should defer by raising
385 | with pytest.raises(ValueError):
386 | provider.get_capabilities("opus")
387 |
388 | # Should raise for disallowed OpenRouter model (still defers)
389 | with pytest.raises(ValueError):
390 | provider.get_capabilities("haiku")
391 |
392 | # Should still work for custom models
393 | capabilities = provider.get_capabilities("local-llama")
394 | assert capabilities.provider == ProviderType.CUSTOM
395 |
396 | @patch.dict(os.environ, {"OPENROUTER_ALLOWED_MODELS": "opus"}, clear=False)
397 | def test_custom_provider_no_openrouter_key_ignores_restrictions(self):
398 | """Test that when OpenRouter key is not set, cloud models are rejected regardless of restrictions."""
399 | # Make sure OPENROUTER_API_KEY is not set
400 | if "OPENROUTER_API_KEY" in os.environ:
401 | del os.environ["OPENROUTER_API_KEY"]
402 | # Clear any cached restriction service
403 | import utils.model_restrictions
404 |
405 | utils.model_restrictions._restriction_service = None
406 |
407 | from providers.custom import CustomProvider
408 |
409 | provider = CustomProvider(base_url="http://test.com/v1")
410 |
411 | # Should not validate OpenRouter models when key is not available
412 | assert not provider.validate_model_name("opus") # Even though it's in allowed list
413 | assert not provider.validate_model_name("haiku")
414 |
415 | # Should still validate custom models
416 | assert provider.validate_model_name("local-llama")
417 |
418 | @patch.dict(os.environ, {"OPENROUTER_ALLOWED_MODELS": "", "OPENROUTER_API_KEY": "test-key"})
419 | def test_custom_provider_empty_restrictions_allows_all_openrouter(self):
420 | """Test that custom provider correctly defers OpenRouter models regardless of restrictions."""
421 | # Clear any cached restriction service
422 | import utils.model_restrictions
423 |
424 | utils.model_restrictions._restriction_service = None
425 |
426 | from providers.custom import CustomProvider
427 |
428 | provider = CustomProvider(base_url="http://test.com/v1")
429 |
430 | # CustomProvider should NOT validate OpenRouter models - they should be deferred to OpenRouter
431 | assert not provider.validate_model_name("opus")
432 | assert not provider.validate_model_name("sonnet")
433 | assert not provider.validate_model_name("haiku")
434 |
435 |
436 | class TestRegistryIntegration:
437 | """Test integration with ModelProviderRegistry."""
438 |
439 | @patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "mini", "GOOGLE_ALLOWED_MODELS": "flash"})
440 | def test_registry_with_shorthand_restrictions(self):
441 | """Test that registry handles shorthand restrictions correctly."""
442 | # Clear cached restriction service
443 | import utils.model_restrictions
444 |
445 | utils.model_restrictions._restriction_service = None
446 |
447 | from providers.registry import ModelProviderRegistry
448 |
449 | # Clear registry cache
450 | ModelProviderRegistry.clear_cache()
451 |
452 | # Get available models with restrictions
453 | # This test documents current behavior - get_available_models doesn't handle aliases
454 | ModelProviderRegistry.get_available_models(respect_restrictions=True)
455 |
456 | # Currently, this will be empty because get_available_models doesn't
457 | # recognize that "mini" allows "o4-mini"
458 | # This is a known limitation that should be documented
459 |
460 | @patch("providers.registry.ModelProviderRegistry.get_provider")
461 | def test_get_available_models_respects_restrictions(self, mock_get_provider):
462 | """Test that registry filters models based on restrictions."""
463 | from providers.registry import ModelProviderRegistry
464 |
465 | # Mock providers
466 | mock_openai = MagicMock()
467 | mock_openai.MODEL_CAPABILITIES = {
468 | "o3": {"context_window": 200000},
469 | "o3-mini": {"context_window": 200000},
470 | }
471 | mock_openai.get_provider_type.return_value = ProviderType.OPENAI
472 |
473 | def openai_list_models(
474 | *,
475 | respect_restrictions: bool = True,
476 | include_aliases: bool = True,
477 | lowercase: bool = False,
478 | unique: bool = False,
479 | ):
480 | from utils.model_restrictions import get_restriction_service
481 |
482 | restriction_service = get_restriction_service() if respect_restrictions else None
483 | models = []
484 | for model_name, config in mock_openai.MODEL_CAPABILITIES.items():
485 | if isinstance(config, str):
486 | target_model = config
487 | if restriction_service and not restriction_service.is_allowed(ProviderType.OPENAI, target_model):
488 | continue
489 | if include_aliases:
490 | models.append(model_name)
491 | else:
492 | if restriction_service and not restriction_service.is_allowed(ProviderType.OPENAI, model_name):
493 | continue
494 | models.append(model_name)
495 | if lowercase:
496 | models = [m.lower() for m in models]
497 | if unique:
498 | seen = set()
499 | ordered = []
500 | for name in models:
501 | if name in seen:
502 | continue
503 | seen.add(name)
504 | ordered.append(name)
505 | models = ordered
506 | return models
507 |
508 | mock_openai.list_models = MagicMock(side_effect=openai_list_models)
509 |
510 | mock_gemini = MagicMock()
511 | mock_gemini.MODEL_CAPABILITIES = {
512 | "gemini-2.5-pro": {"context_window": 1048576},
513 | "gemini-2.5-flash": {"context_window": 1048576},
514 | }
515 | mock_gemini.get_provider_type.return_value = ProviderType.GOOGLE
516 |
517 | def gemini_list_models(
518 | *,
519 | respect_restrictions: bool = True,
520 | include_aliases: bool = True,
521 | lowercase: bool = False,
522 | unique: bool = False,
523 | ):
524 | from utils.model_restrictions import get_restriction_service
525 |
526 | restriction_service = get_restriction_service() if respect_restrictions else None
527 | models = []
528 | for model_name, config in mock_gemini.MODEL_CAPABILITIES.items():
529 | if isinstance(config, str):
530 | target_model = config
531 | if restriction_service and not restriction_service.is_allowed(ProviderType.GOOGLE, target_model):
532 | continue
533 | if include_aliases:
534 | models.append(model_name)
535 | else:
536 | if restriction_service and not restriction_service.is_allowed(ProviderType.GOOGLE, model_name):
537 | continue
538 | models.append(model_name)
539 | if lowercase:
540 | models = [m.lower() for m in models]
541 | if unique:
542 | seen = set()
543 | ordered = []
544 | for name in models:
545 | if name in seen:
546 | continue
547 | seen.add(name)
548 | ordered.append(name)
549 | models = ordered
550 | return models
551 |
552 | mock_gemini.list_models = MagicMock(side_effect=gemini_list_models)
553 |
554 | def get_provider_side_effect(provider_type):
555 | if provider_type == ProviderType.OPENAI:
556 | return mock_openai
557 | elif provider_type == ProviderType.GOOGLE:
558 | return mock_gemini
559 | return None
560 |
561 | mock_get_provider.side_effect = get_provider_side_effect
562 |
563 | # Set up registry with providers
564 | registry = ModelProviderRegistry()
565 | registry._providers = {
566 | ProviderType.OPENAI: type(mock_openai),
567 | ProviderType.GOOGLE: type(mock_gemini),
568 | }
569 |
570 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-mini", "GOOGLE_ALLOWED_MODELS": "gemini-2.5-flash"}):
571 | # Clear cached restriction service
572 | import utils.model_restrictions
573 |
574 | utils.model_restrictions._restriction_service = None
575 |
576 | available = ModelProviderRegistry.get_available_models(respect_restrictions=True)
577 |
578 | # Should only include allowed models
579 | assert "o3-mini" in available
580 | assert "o3" not in available
581 | assert "gemini-2.5-flash" in available
582 | assert "gemini-2.5-pro" not in available
583 |
584 |
585 | class TestShorthandRestrictions:
586 | """Test that shorthand model names work correctly in restrictions."""
587 |
588 | @patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "mini", "GOOGLE_ALLOWED_MODELS": "flash"})
589 | def test_providers_validate_shorthands_correctly(self):
590 | """Test that providers correctly validate shorthand names."""
591 | # Clear cached restriction service
592 | import utils.model_restrictions
593 |
594 | utils.model_restrictions._restriction_service = None
595 |
596 | # Test OpenAI provider
597 | openai_provider = OpenAIModelProvider(api_key="test-key")
598 | gemini_provider = GeminiModelProvider(api_key="test-key")
599 |
600 | from providers.registry import ModelProviderRegistry
601 |
602 | def registry_side_effect(provider_type, force_new=False):
603 | mapping = {
604 | ProviderType.OPENAI: openai_provider,
605 | ProviderType.GOOGLE: gemini_provider,
606 | }
607 | return mapping.get(provider_type)
608 |
609 | with patch.object(ModelProviderRegistry, "get_provider", side_effect=registry_side_effect):
610 | assert openai_provider.validate_model_name("mini") # Should work with shorthand
611 | assert openai_provider.validate_model_name("gpt-5-mini") # Canonical resolved from shorthand
612 | assert not openai_provider.validate_model_name("o4-mini") # Unrelated model still blocked
613 | assert not openai_provider.validate_model_name("o3-mini")
614 |
615 | # Test Gemini provider
616 | assert gemini_provider.validate_model_name("flash") # Should work with shorthand
617 | assert gemini_provider.validate_model_name("gemini-2.5-flash") # Canonical allowed
618 | assert not gemini_provider.validate_model_name("pro") # Not allowed
619 |
620 | @patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3mini,mini,o4-mini"})
621 | def test_multiple_shorthands_for_same_model(self):
622 | """Test that multiple shorthands work correctly."""
623 | # Clear cached restriction service
624 | import utils.model_restrictions
625 |
626 | utils.model_restrictions._restriction_service = None
627 |
628 | openai_provider = OpenAIModelProvider(api_key="test-key")
629 |
630 | # Both shorthands should work
631 | assert openai_provider.validate_model_name("mini") # mini -> o4-mini
632 | assert openai_provider.validate_model_name("o3mini") # o3mini -> o3-mini
633 |
634 | # Resolved names should be allowed when their shorthands are present
635 | assert openai_provider.validate_model_name("o4-mini") # Explicitly allowed
636 | assert openai_provider.validate_model_name("o3-mini") # Allowed via shorthand
637 |
638 | # Other models should not work
639 | assert not openai_provider.validate_model_name("o3")
640 | assert not openai_provider.validate_model_name("o3-pro")
641 |
642 | @patch.dict(
643 | os.environ,
644 | {"OPENAI_ALLOWED_MODELS": "mini,o4-mini", "GOOGLE_ALLOWED_MODELS": "flash,gemini-2.5-flash"},
645 | )
646 | def test_both_shorthand_and_full_name_allowed(self):
647 | """Test that we can allow both shorthand and full names."""
648 | # Clear cached restriction service
649 | import utils.model_restrictions
650 |
651 | utils.model_restrictions._restriction_service = None
652 |
653 | # OpenAI - both mini and o4-mini are allowed
654 | openai_provider = OpenAIModelProvider(api_key="test-key")
655 | assert openai_provider.validate_model_name("mini")
656 | assert openai_provider.validate_model_name("o4-mini")
657 |
658 | # Gemini - both flash and full name are allowed
659 | gemini_provider = GeminiModelProvider(api_key="test-key")
660 | assert gemini_provider.validate_model_name("flash")
661 | assert gemini_provider.validate_model_name("gemini-2.5-flash")
662 |
663 |
664 | class TestAutoModeWithRestrictions:
665 | """Test auto mode behavior with restrictions."""
666 |
667 | @patch("providers.registry.ModelProviderRegistry.get_provider")
668 | def test_fallback_model_respects_restrictions(self, mock_get_provider):
669 | """Test that fallback model selection respects restrictions."""
670 | from providers.registry import ModelProviderRegistry
671 | from tools.models import ToolModelCategory
672 |
673 | # Mock providers
674 | mock_openai = MagicMock()
675 | mock_openai.MODEL_CAPABILITIES = {
676 | "o3": {"context_window": 200000},
677 | "o3-mini": {"context_window": 200000},
678 | "o4-mini": {"context_window": 200000},
679 | }
680 | mock_openai.get_provider_type.return_value = ProviderType.OPENAI
681 |
682 | def openai_list_models(
683 | *,
684 | respect_restrictions: bool = True,
685 | include_aliases: bool = True,
686 | lowercase: bool = False,
687 | unique: bool = False,
688 | ):
689 | from utils.model_restrictions import get_restriction_service
690 |
691 | restriction_service = get_restriction_service() if respect_restrictions else None
692 | models = []
693 | for model_name, config in mock_openai.MODEL_CAPABILITIES.items():
694 | if isinstance(config, str):
695 | target_model = config
696 | if restriction_service and not restriction_service.is_allowed(ProviderType.OPENAI, target_model):
697 | continue
698 | if include_aliases:
699 | models.append(model_name)
700 | else:
701 | if restriction_service and not restriction_service.is_allowed(ProviderType.OPENAI, model_name):
702 | continue
703 | models.append(model_name)
704 | if lowercase:
705 | models = [m.lower() for m in models]
706 | if unique:
707 | seen = set()
708 | ordered = []
709 | for name in models:
710 | if name in seen:
711 | continue
712 | seen.add(name)
713 | ordered.append(name)
714 | models = ordered
715 | return models
716 |
717 | mock_openai.list_models = MagicMock(side_effect=openai_list_models)
718 |
719 | # Add get_preferred_model method to mock to match new implementation
720 | def get_preferred_model(category, allowed_models):
721 | # Simple preference logic for testing - just return first allowed model
722 | return allowed_models[0] if allowed_models else None
723 |
724 | mock_openai.get_preferred_model = get_preferred_model
725 |
726 | def get_provider_side_effect(provider_type):
727 | if provider_type == ProviderType.OPENAI:
728 | return mock_openai
729 | return None
730 |
731 | mock_get_provider.side_effect = get_provider_side_effect
732 |
733 | # Set up registry
734 | registry = ModelProviderRegistry()
735 | registry._providers = {ProviderType.OPENAI: type(mock_openai)}
736 |
737 | with patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o4-mini"}):
738 | # Clear cached restriction service
739 | import utils.model_restrictions
740 |
741 | utils.model_restrictions._restriction_service = None
742 |
743 | # Should pick o4-mini instead of o3-mini for fast response
744 | model = ModelProviderRegistry.get_preferred_fallback_model(ToolModelCategory.FAST_RESPONSE)
745 | assert model == "o4-mini"
746 |
747 | def test_fallback_with_shorthand_restrictions(self, monkeypatch):
748 | """Test fallback model selection with shorthand restrictions."""
749 | # Use monkeypatch to set environment variables with automatic cleanup
750 | monkeypatch.setenv("OPENAI_ALLOWED_MODELS", "mini")
751 | monkeypatch.setenv("GEMINI_API_KEY", "")
752 | monkeypatch.setenv("OPENAI_API_KEY", "test-key")
753 |
754 | # Clear caches and reset registry
755 | import utils.model_restrictions
756 | from providers.registry import ModelProviderRegistry
757 | from tools.models import ToolModelCategory
758 |
759 | utils.model_restrictions._restriction_service = None
760 |
761 | # Store original providers for restoration
762 | registry = ModelProviderRegistry()
763 | original_providers = registry._providers.copy()
764 | original_initialized = registry._initialized_providers.copy()
765 |
766 | try:
767 | # Clear registry and register only OpenAI and Gemini providers
768 | ModelProviderRegistry._instance = None
769 | from providers.gemini import GeminiModelProvider
770 | from providers.openai import OpenAIModelProvider
771 |
772 | ModelProviderRegistry.register_provider(ProviderType.OPENAI, OpenAIModelProvider)
773 | ModelProviderRegistry.register_provider(ProviderType.GOOGLE, GeminiModelProvider)
774 |
775 | # Even with "mini" restriction, fallback should work if provider handles it correctly
776 | # This tests the real-world scenario
777 | model = ModelProviderRegistry.get_preferred_fallback_model(ToolModelCategory.FAST_RESPONSE)
778 |
779 | # The fallback will depend on how get_available_models handles aliases
780 | # When "mini" is allowed, it's returned as the allowed model
781 | # "mini" is now an alias for gpt-5-mini, but the list shows "mini" itself
782 | assert model in ["mini", "gpt-5-mini", "o4-mini", "gemini-2.5-flash"]
783 | finally:
784 | # Restore original registry state
785 | registry = ModelProviderRegistry()
786 | registry._providers.clear()
787 | registry._initialized_providers.clear()
788 | registry._providers.update(original_providers)
789 | registry._initialized_providers.update(original_initialized)
790 |
```
--------------------------------------------------------------------------------
/providers/openai_compatible.py:
--------------------------------------------------------------------------------
```python
1 | """Base class for OpenAI-compatible API providers."""
2 |
3 | import copy
4 | import ipaddress
5 | import logging
6 | from typing import Optional
7 | from urllib.parse import urlparse
8 |
9 | from openai import OpenAI
10 |
11 | from utils.env import get_env, suppress_env_vars
12 | from utils.image_utils import validate_image
13 |
14 | from .base import ModelProvider
15 | from .shared import (
16 | ModelCapabilities,
17 | ModelResponse,
18 | ProviderType,
19 | )
20 |
21 |
22 | class OpenAICompatibleProvider(ModelProvider):
23 | """Shared implementation for OpenAI API lookalikes.
24 |
25 | The class owns HTTP client configuration (timeouts, proxy hardening,
26 | custom headers) and normalises the OpenAI SDK responses into
27 | :class:`~providers.shared.ModelResponse`. Concrete subclasses only need to
28 | provide capability metadata and any provider-specific request tweaks.
29 | """
30 |
31 | DEFAULT_HEADERS = {}
32 | FRIENDLY_NAME = "OpenAI Compatible"
33 |
34 | def __init__(self, api_key: str, base_url: str = None, **kwargs):
35 | """Initialize the provider with API key and optional base URL.
36 |
37 | Args:
38 | api_key: API key for authentication
39 | base_url: Base URL for the API endpoint
40 | **kwargs: Additional configuration options including timeout
41 | """
42 | self._allowed_alias_cache: dict[str, str] = {}
43 | super().__init__(api_key, **kwargs)
44 | self._client = None
45 | self.base_url = base_url
46 | self.organization = kwargs.get("organization")
47 | self.allowed_models = self._parse_allowed_models()
48 |
49 | # Configure timeouts - especially important for custom/local endpoints
50 | self.timeout_config = self._configure_timeouts(**kwargs)
51 |
52 | # Validate base URL for security
53 | if self.base_url:
54 | self._validate_base_url()
55 |
56 | # Warn if using external URL without authentication
57 | if self.base_url and not self._is_localhost_url() and not api_key:
58 | logging.warning(
59 | f"Using external URL '{self.base_url}' without API key. "
60 | "This may be insecure. Consider setting an API key for authentication."
61 | )
62 |
63 | def _ensure_model_allowed(
64 | self,
65 | capabilities: ModelCapabilities,
66 | canonical_name: str,
67 | requested_name: str,
68 | ) -> None:
69 | """Respect provider-specific allowlists before default restriction checks."""
70 |
71 | super()._ensure_model_allowed(capabilities, canonical_name, requested_name)
72 |
73 | if self.allowed_models is not None:
74 | requested = requested_name.lower()
75 | canonical = canonical_name.lower()
76 |
77 | if requested not in self.allowed_models and canonical not in self.allowed_models:
78 | allowed = False
79 | for allowed_entry in list(self.allowed_models):
80 | normalized_resolved = self._allowed_alias_cache.get(allowed_entry)
81 | if normalized_resolved is None:
82 | try:
83 | resolved_name = self._resolve_model_name(allowed_entry)
84 | except Exception:
85 | continue
86 |
87 | if not resolved_name:
88 | continue
89 |
90 | normalized_resolved = resolved_name.lower()
91 | self._allowed_alias_cache[allowed_entry] = normalized_resolved
92 |
93 | if normalized_resolved == canonical:
94 | # Canonical match discovered via alias resolution – mark as allowed and
95 | # memoise the canonical entry for future lookups.
96 | allowed = True
97 | self._allowed_alias_cache[canonical] = canonical
98 | self.allowed_models.add(canonical)
99 | break
100 |
101 | if not allowed:
102 | raise ValueError(
103 | f"Model '{requested_name}' is not allowed by restriction policy. Allowed models: {sorted(self.allowed_models)}"
104 | )
105 |
106 | def _parse_allowed_models(self) -> Optional[set[str]]:
107 | """Parse allowed models from environment variable.
108 |
109 | Returns:
110 | Set of allowed model names (lowercase) or None if not configured
111 | """
112 | # Get provider-specific allowed models
113 | provider_type = self.get_provider_type().value.upper()
114 | env_var = f"{provider_type}_ALLOWED_MODELS"
115 | models_str = get_env(env_var, "") or ""
116 |
117 | if models_str:
118 | # Parse and normalize to lowercase for case-insensitive comparison
119 | models = {m.strip().lower() for m in models_str.split(",") if m.strip()}
120 | if models:
121 | logging.info(f"Configured allowed models for {self.FRIENDLY_NAME}: {sorted(models)}")
122 | self._allowed_alias_cache = {}
123 | return models
124 |
125 | # Log info if no allow-list configured for proxy providers
126 | if self.get_provider_type() not in [ProviderType.GOOGLE, ProviderType.OPENAI]:
127 | logging.info(
128 | f"Model allow-list not configured for {self.FRIENDLY_NAME} - all models permitted. "
129 | f"To restrict access, set {env_var} with comma-separated model names."
130 | )
131 |
132 | return None
133 |
134 | def _configure_timeouts(self, **kwargs):
135 | """Configure timeout settings based on provider type and custom settings.
136 |
137 | Custom URLs and local models often need longer timeouts due to:
138 | - Network latency on local networks
139 | - Extended thinking models taking longer to respond
140 | - Local inference being slower than cloud APIs
141 |
142 | Returns:
143 | httpx.Timeout object with appropriate timeout settings
144 | """
145 | import httpx
146 |
147 | # Default timeouts - more generous for custom/local endpoints
148 | default_connect = 30.0 # 30 seconds for connection (vs OpenAI's 5s)
149 | default_read = 600.0 # 10 minutes for reading (same as OpenAI default)
150 | default_write = 600.0 # 10 minutes for writing
151 | default_pool = 600.0 # 10 minutes for pool
152 |
153 | # For custom/local URLs, use even longer timeouts
154 | if self.base_url and self._is_localhost_url():
155 | default_connect = 60.0 # 1 minute for local connections
156 | default_read = 1800.0 # 30 minutes for local models (extended thinking)
157 | default_write = 1800.0 # 30 minutes for local models
158 | default_pool = 1800.0 # 30 minutes for local models
159 | logging.info(f"Using extended timeouts for local endpoint: {self.base_url}")
160 | elif self.base_url:
161 | default_connect = 45.0 # 45 seconds for custom remote endpoints
162 | default_read = 900.0 # 15 minutes for custom remote endpoints
163 | default_write = 900.0 # 15 minutes for custom remote endpoints
164 | default_pool = 900.0 # 15 minutes for custom remote endpoints
165 | logging.info(f"Using extended timeouts for custom endpoint: {self.base_url}")
166 |
167 | # Allow override via kwargs or environment variables in future, for now...
168 | connect_timeout = kwargs.get("connect_timeout")
169 | if connect_timeout is None:
170 | connect_timeout_raw = get_env("CUSTOM_CONNECT_TIMEOUT")
171 | connect_timeout = float(connect_timeout_raw) if connect_timeout_raw is not None else float(default_connect)
172 |
173 | read_timeout = kwargs.get("read_timeout")
174 | if read_timeout is None:
175 | read_timeout_raw = get_env("CUSTOM_READ_TIMEOUT")
176 | read_timeout = float(read_timeout_raw) if read_timeout_raw is not None else float(default_read)
177 |
178 | write_timeout = kwargs.get("write_timeout")
179 | if write_timeout is None:
180 | write_timeout_raw = get_env("CUSTOM_WRITE_TIMEOUT")
181 | write_timeout = float(write_timeout_raw) if write_timeout_raw is not None else float(default_write)
182 |
183 | pool_timeout = kwargs.get("pool_timeout")
184 | if pool_timeout is None:
185 | pool_timeout_raw = get_env("CUSTOM_POOL_TIMEOUT")
186 | pool_timeout = float(pool_timeout_raw) if pool_timeout_raw is not None else float(default_pool)
187 |
188 | timeout = httpx.Timeout(connect=connect_timeout, read=read_timeout, write=write_timeout, pool=pool_timeout)
189 |
190 | logging.debug(
191 | f"Configured timeouts - Connect: {connect_timeout}s, Read: {read_timeout}s, "
192 | f"Write: {write_timeout}s, Pool: {pool_timeout}s"
193 | )
194 |
195 | return timeout
196 |
197 | def _is_localhost_url(self) -> bool:
198 | """Check if the base URL points to localhost or local network.
199 |
200 | Returns:
201 | True if URL is localhost or local network, False otherwise
202 | """
203 | if not self.base_url:
204 | return False
205 |
206 | try:
207 | parsed = urlparse(self.base_url)
208 | hostname = parsed.hostname
209 |
210 | # Check for common localhost patterns
211 | if hostname in ["localhost", "127.0.0.1", "::1"]:
212 | return True
213 |
214 | # Check for private network ranges (local network)
215 | if hostname:
216 | try:
217 | ip = ipaddress.ip_address(hostname)
218 | return ip.is_private or ip.is_loopback
219 | except ValueError:
220 | # Not an IP address, might be a hostname
221 | pass
222 |
223 | return False
224 | except Exception:
225 | return False
226 |
227 | def _validate_base_url(self) -> None:
228 | """Validate base URL for security (SSRF protection).
229 |
230 | Raises:
231 | ValueError: If URL is invalid or potentially unsafe
232 | """
233 | if not self.base_url:
234 | return
235 |
236 | try:
237 | parsed = urlparse(self.base_url)
238 |
239 | # Check URL scheme - only allow http/https
240 | if parsed.scheme not in ("http", "https"):
241 | raise ValueError(f"Invalid URL scheme: {parsed.scheme}. Only http/https allowed.")
242 |
243 | # Check hostname exists
244 | if not parsed.hostname:
245 | raise ValueError("URL must include a hostname")
246 |
247 | # Check port is valid (if specified)
248 | port = parsed.port
249 | if port is not None and (port < 1 or port > 65535):
250 | raise ValueError(f"Invalid port number: {port}. Must be between 1 and 65535.")
251 | except Exception as e:
252 | if isinstance(e, ValueError):
253 | raise
254 | raise ValueError(f"Invalid base URL '{self.base_url}': {str(e)}")
255 |
256 | @property
257 | def client(self):
258 | """Lazy initialization of OpenAI client with security checks and timeout configuration."""
259 | if self._client is None:
260 | import httpx
261 |
262 | proxy_env_vars = ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"]
263 |
264 | with suppress_env_vars(*proxy_env_vars):
265 | try:
266 | # Create a custom httpx client that explicitly avoids proxy parameters
267 | timeout_config = (
268 | self.timeout_config
269 | if hasattr(self, "timeout_config") and self.timeout_config
270 | else httpx.Timeout(30.0)
271 | )
272 |
273 | # Create httpx client with minimal config to avoid proxy conflicts
274 | # Note: proxies parameter was removed in httpx 0.28.0
275 | # Check for test transport injection
276 | if hasattr(self, "_test_transport"):
277 | # Use custom transport for testing (HTTP recording/replay)
278 | http_client = httpx.Client(
279 | transport=self._test_transport,
280 | timeout=timeout_config,
281 | follow_redirects=True,
282 | )
283 | else:
284 | # Normal production client
285 | http_client = httpx.Client(
286 | timeout=timeout_config,
287 | follow_redirects=True,
288 | )
289 |
290 | # Keep client initialization minimal to avoid proxy parameter conflicts
291 | client_kwargs = {
292 | "api_key": self.api_key,
293 | "http_client": http_client,
294 | }
295 |
296 | if self.base_url:
297 | client_kwargs["base_url"] = self.base_url
298 |
299 | if self.organization:
300 | client_kwargs["organization"] = self.organization
301 |
302 | # Add default headers if any
303 | if self.DEFAULT_HEADERS:
304 | client_kwargs["default_headers"] = self.DEFAULT_HEADERS.copy()
305 |
306 | logging.debug(
307 | "OpenAI client initialized with custom httpx client and timeout: %s",
308 | timeout_config,
309 | )
310 |
311 | # Create OpenAI client with custom httpx client
312 | self._client = OpenAI(**client_kwargs)
313 |
314 | except Exception as e:
315 | # If all else fails, try absolute minimal client without custom httpx
316 | logging.warning(
317 | "Failed to create client with custom httpx, falling back to minimal config: %s",
318 | e,
319 | )
320 | try:
321 | minimal_kwargs = {"api_key": self.api_key}
322 | if self.base_url:
323 | minimal_kwargs["base_url"] = self.base_url
324 | self._client = OpenAI(**minimal_kwargs)
325 | except Exception as fallback_error:
326 | logging.error("Even minimal OpenAI client creation failed: %s", fallback_error)
327 | raise
328 |
329 | return self._client
330 |
331 | def _sanitize_for_logging(self, params: dict) -> dict:
332 | """Sanitize sensitive data from parameters before logging.
333 |
334 | Args:
335 | params: Dictionary of API parameters
336 |
337 | Returns:
338 | dict: Sanitized copy of parameters safe for logging
339 | """
340 | sanitized = copy.deepcopy(params)
341 |
342 | # Sanitize messages content
343 | if "input" in sanitized:
344 | for msg in sanitized.get("input", []):
345 | if isinstance(msg, dict) and "content" in msg:
346 | for content_item in msg.get("content", []):
347 | if isinstance(content_item, dict) and "text" in content_item:
348 | # Truncate long text and add ellipsis
349 | text = content_item["text"]
350 | if len(text) > 100:
351 | content_item["text"] = text[:100] + "... [truncated]"
352 |
353 | # Remove any API keys that might be in headers/auth
354 | sanitized.pop("api_key", None)
355 | sanitized.pop("authorization", None)
356 |
357 | return sanitized
358 |
359 | def _safe_extract_output_text(self, response) -> str:
360 | """Safely extract output_text from o3-pro response with validation.
361 |
362 | Args:
363 | response: Response object from OpenAI SDK
364 |
365 | Returns:
366 | str: The output text content
367 |
368 | Raises:
369 | ValueError: If output_text is missing, None, or not a string
370 | """
371 | logging.debug(f"Response object type: {type(response)}")
372 | logging.debug(f"Response attributes: {dir(response)}")
373 |
374 | if not hasattr(response, "output_text"):
375 | raise ValueError(f"o3-pro response missing output_text field. Response type: {type(response).__name__}")
376 |
377 | content = response.output_text
378 | logging.debug(f"Extracted output_text: '{content}' (type: {type(content)})")
379 |
380 | if content is None:
381 | raise ValueError("o3-pro returned None for output_text")
382 |
383 | if not isinstance(content, str):
384 | raise ValueError(f"o3-pro output_text is not a string. Got type: {type(content).__name__}")
385 |
386 | return content
387 |
388 | def _generate_with_responses_endpoint(
389 | self,
390 | model_name: str,
391 | messages: list,
392 | temperature: float,
393 | max_output_tokens: Optional[int] = None,
394 | capabilities: Optional[ModelCapabilities] = None,
395 | **kwargs,
396 | ) -> ModelResponse:
397 | """Generate content using the /v1/responses endpoint for reasoning models."""
398 | # Convert messages to the correct format for responses endpoint
399 | input_messages = []
400 |
401 | for message in messages:
402 | role = message.get("role", "")
403 | content = message.get("content", "")
404 |
405 | if role == "system":
406 | # For o3-pro, system messages should be handled carefully to avoid policy violations
407 | # Instead of prefixing with "System:", we'll include the system content naturally
408 | input_messages.append({"role": "user", "content": [{"type": "input_text", "text": content}]})
409 | elif role == "user":
410 | input_messages.append({"role": "user", "content": [{"type": "input_text", "text": content}]})
411 | elif role == "assistant":
412 | input_messages.append({"role": "assistant", "content": [{"type": "output_text", "text": content}]})
413 |
414 | # Prepare completion parameters for responses endpoint
415 | # Based on OpenAI documentation, use nested reasoning object for responses endpoint
416 | effort = "medium"
417 | if capabilities and capabilities.default_reasoning_effort:
418 | effort = capabilities.default_reasoning_effort
419 |
420 | completion_params = {
421 | "model": model_name,
422 | "input": input_messages,
423 | "reasoning": {"effort": effort},
424 | "store": True,
425 | }
426 |
427 | # Add max tokens if specified (using max_completion_tokens for responses endpoint)
428 | if max_output_tokens:
429 | completion_params["max_completion_tokens"] = max_output_tokens
430 |
431 | # For responses endpoint, we only add parameters that are explicitly supported
432 | # Remove unsupported chat completion parameters that may cause API errors
433 |
434 | # Retry logic with progressive delays
435 | max_retries = 4
436 | retry_delays = [1, 3, 5, 8]
437 | attempt_counter = {"value": 0}
438 |
439 | def _attempt() -> ModelResponse:
440 | attempt_counter["value"] += 1
441 | import json
442 |
443 | sanitized_params = self._sanitize_for_logging(completion_params)
444 | logging.info(
445 | f"o3-pro API request (sanitized): {json.dumps(sanitized_params, indent=2, ensure_ascii=False)}"
446 | )
447 |
448 | response = self.client.responses.create(**completion_params)
449 |
450 | content = self._safe_extract_output_text(response)
451 |
452 | usage = None
453 | if hasattr(response, "usage"):
454 | usage = self._extract_usage(response)
455 | elif hasattr(response, "input_tokens") and hasattr(response, "output_tokens"):
456 | input_tokens = getattr(response, "input_tokens", 0) or 0
457 | output_tokens = getattr(response, "output_tokens", 0) or 0
458 | usage = {
459 | "input_tokens": input_tokens,
460 | "output_tokens": output_tokens,
461 | "total_tokens": input_tokens + output_tokens,
462 | }
463 |
464 | return ModelResponse(
465 | content=content,
466 | usage=usage,
467 | model_name=model_name,
468 | friendly_name=self.FRIENDLY_NAME,
469 | provider=self.get_provider_type(),
470 | metadata={
471 | "model": getattr(response, "model", model_name),
472 | "id": getattr(response, "id", ""),
473 | "created": getattr(response, "created_at", 0),
474 | "endpoint": "responses",
475 | },
476 | )
477 |
478 | try:
479 | return self._run_with_retries(
480 | operation=_attempt,
481 | max_attempts=max_retries,
482 | delays=retry_delays,
483 | log_prefix="responses endpoint",
484 | )
485 | except Exception as exc:
486 | attempts = max(attempt_counter["value"], 1)
487 | error_msg = f"responses endpoint error after {attempts} attempt{'s' if attempts > 1 else ''}: {exc}"
488 | logging.error(error_msg)
489 | raise RuntimeError(error_msg) from exc
490 |
491 | def generate_content(
492 | self,
493 | prompt: str,
494 | model_name: str,
495 | system_prompt: Optional[str] = None,
496 | temperature: float = 0.3,
497 | max_output_tokens: Optional[int] = None,
498 | images: Optional[list[str]] = None,
499 | **kwargs,
500 | ) -> ModelResponse:
501 | """Generate content using the OpenAI-compatible API.
502 |
503 | Args:
504 | prompt: User prompt to send to the model
505 | model_name: Canonical model name or its alias
506 | system_prompt: Optional system prompt for model behavior
507 | temperature: Sampling temperature
508 | max_output_tokens: Maximum tokens to generate
509 | images: Optional list of image paths or data URLs to include with the prompt (for vision models)
510 | **kwargs: Additional provider-specific parameters
511 |
512 | Returns:
513 | ModelResponse with generated content and metadata
514 | """
515 | # Validate model name against allow-list
516 | if not self.validate_model_name(model_name):
517 | raise ValueError(f"Model '{model_name}' not in allowed models list. Allowed models: {self.allowed_models}")
518 |
519 | capabilities: Optional[ModelCapabilities]
520 | try:
521 | capabilities = self.get_capabilities(model_name)
522 | except Exception as exc:
523 | logging.debug(f"Falling back to generic capabilities for {model_name}: {exc}")
524 | capabilities = None
525 |
526 | # Get effective temperature for this model from capabilities when available
527 | if capabilities:
528 | effective_temperature = capabilities.get_effective_temperature(temperature)
529 | if effective_temperature is not None and effective_temperature != temperature:
530 | logging.debug(
531 | f"Adjusting temperature from {temperature} to {effective_temperature} for model {model_name}"
532 | )
533 | else:
534 | effective_temperature = temperature
535 |
536 | # Only validate if temperature is not None (meaning the model supports it)
537 | if effective_temperature is not None:
538 | # Validate parameters with the effective temperature
539 | self.validate_parameters(model_name, effective_temperature)
540 |
541 | # Resolve to canonical model name
542 | resolved_model = self._resolve_model_name(model_name)
543 |
544 | # Prepare messages
545 | messages = []
546 | if system_prompt:
547 | messages.append({"role": "system", "content": system_prompt})
548 |
549 | # Prepare user message with text and potentially images
550 | user_content = []
551 | user_content.append({"type": "text", "text": prompt})
552 |
553 | # Add images if provided and model supports vision
554 | if images and capabilities and capabilities.supports_images:
555 | for image_path in images:
556 | try:
557 | image_content = self._process_image(image_path)
558 | if image_content:
559 | user_content.append(image_content)
560 | except Exception as e:
561 | logging.warning(f"Failed to process image {image_path}: {e}")
562 | # Continue with other images and text
563 | continue
564 | elif images and (not capabilities or not capabilities.supports_images):
565 | logging.warning(f"Model {resolved_model} does not support images, ignoring {len(images)} image(s)")
566 |
567 | # Add user message
568 | if len(user_content) == 1:
569 | # Only text content, use simple string format for compatibility
570 | messages.append({"role": "user", "content": prompt})
571 | else:
572 | # Text + images, use content array format
573 | messages.append({"role": "user", "content": user_content})
574 |
575 | # Prepare completion parameters
576 | # Always disable streaming for OpenRouter
577 | # MCP doesn't use streaming, and this avoids issues with O3 model access
578 | completion_params = {
579 | "model": resolved_model,
580 | "messages": messages,
581 | "stream": False,
582 | }
583 |
584 | # Use the effective temperature we calculated earlier
585 | supports_sampling = effective_temperature is not None
586 |
587 | if supports_sampling:
588 | completion_params["temperature"] = effective_temperature
589 |
590 | # Add max tokens if specified and model supports it
591 | # O3/O4 models that don't support temperature also don't support max_tokens
592 | if max_output_tokens and supports_sampling:
593 | completion_params["max_tokens"] = max_output_tokens
594 |
595 | # Add any additional OpenAI-specific parameters
596 | # Use capabilities to filter parameters for reasoning models
597 | for key, value in kwargs.items():
598 | if key in ["top_p", "frequency_penalty", "presence_penalty", "seed", "stop", "stream"]:
599 | # Reasoning models (those that don't support temperature) also don't support these parameters
600 | if not supports_sampling and key in ["top_p", "frequency_penalty", "presence_penalty", "stream"]:
601 | continue # Skip unsupported parameters for reasoning models
602 | completion_params[key] = value
603 |
604 | # Check if this model needs the Responses API endpoint
605 | # Prefer capability metadata; fall back to static map when capabilities unavailable
606 | use_responses_api = False
607 | if capabilities is not None:
608 | use_responses_api = getattr(capabilities, "use_openai_response_api", False)
609 | else:
610 | static_capabilities = self.get_all_model_capabilities().get(resolved_model)
611 | if static_capabilities is not None:
612 | use_responses_api = getattr(static_capabilities, "use_openai_response_api", False)
613 |
614 | if use_responses_api:
615 | # These models require the /v1/responses endpoint for stateful context
616 | # If it fails, we should not fall back to chat/completions
617 | return self._generate_with_responses_endpoint(
618 | model_name=resolved_model,
619 | messages=messages,
620 | temperature=temperature,
621 | max_output_tokens=max_output_tokens,
622 | capabilities=capabilities,
623 | **kwargs,
624 | )
625 |
626 | # Retry logic with progressive delays
627 | max_retries = 4 # Total of 4 attempts
628 | retry_delays = [1, 3, 5, 8] # Progressive delays: 1s, 3s, 5s, 8s
629 | attempt_counter = {"value": 0}
630 |
631 | def _attempt() -> ModelResponse:
632 | attempt_counter["value"] += 1
633 | response = self.client.chat.completions.create(**completion_params)
634 |
635 | content = response.choices[0].message.content
636 | usage = self._extract_usage(response)
637 |
638 | return ModelResponse(
639 | content=content,
640 | usage=usage,
641 | model_name=resolved_model,
642 | friendly_name=self.FRIENDLY_NAME,
643 | provider=self.get_provider_type(),
644 | metadata={
645 | "finish_reason": response.choices[0].finish_reason,
646 | "model": response.model,
647 | "id": response.id,
648 | "created": response.created,
649 | },
650 | )
651 |
652 | try:
653 | return self._run_with_retries(
654 | operation=_attempt,
655 | max_attempts=max_retries,
656 | delays=retry_delays,
657 | log_prefix=f"{self.FRIENDLY_NAME} API ({resolved_model})",
658 | )
659 | except Exception as exc:
660 | attempts = max(attempt_counter["value"], 1)
661 | error_msg = (
662 | f"{self.FRIENDLY_NAME} API error for model {resolved_model} after {attempts} attempt"
663 | f"{'s' if attempts > 1 else ''}: {exc}"
664 | )
665 | logging.error(error_msg)
666 | raise RuntimeError(error_msg) from exc
667 |
668 | def validate_parameters(self, model_name: str, temperature: float, **kwargs) -> None:
669 | """Validate model parameters.
670 |
671 | For proxy providers, this may use generic capabilities.
672 |
673 | Args:
674 | model_name: Canonical model name or its alias
675 | temperature: Temperature to validate
676 | **kwargs: Additional parameters to validate
677 | """
678 | try:
679 | capabilities = self.get_capabilities(model_name)
680 |
681 | # Check if we're using generic capabilities
682 | if hasattr(capabilities, "_is_generic"):
683 | logging.debug(
684 | f"Using generic parameter validation for {model_name}. Actual model constraints may differ."
685 | )
686 |
687 | # Validate temperature using parent class method
688 | super().validate_parameters(model_name, temperature, **kwargs)
689 |
690 | except Exception as e:
691 | # For proxy providers, we might not have accurate capabilities
692 | # Log warning but don't fail
693 | logging.warning(f"Parameter validation limited for {model_name}: {e}")
694 |
695 | def _extract_usage(self, response) -> dict[str, int]:
696 | """Extract token usage from OpenAI response.
697 |
698 | Args:
699 | response: OpenAI API response object
700 |
701 | Returns:
702 | Dictionary with usage statistics
703 | """
704 | usage = {}
705 |
706 | if hasattr(response, "usage") and response.usage:
707 | # Safely extract token counts with None handling
708 | usage["input_tokens"] = getattr(response.usage, "prompt_tokens", 0) or 0
709 | usage["output_tokens"] = getattr(response.usage, "completion_tokens", 0) or 0
710 | usage["total_tokens"] = getattr(response.usage, "total_tokens", 0) or 0
711 |
712 | return usage
713 |
714 | def count_tokens(self, text: str, model_name: str) -> int:
715 | """Count tokens using OpenAI-compatible tokenizer tables when available."""
716 |
717 | resolved_model = self._resolve_model_name(model_name)
718 |
719 | try:
720 | import tiktoken
721 |
722 | try:
723 | encoding = tiktoken.encoding_for_model(resolved_model)
724 | except KeyError:
725 | encoding = tiktoken.get_encoding("cl100k_base")
726 |
727 | return len(encoding.encode(text))
728 |
729 | except (ImportError, Exception) as exc:
730 | logging.debug("tiktoken unavailable for %s: %s", resolved_model, exc)
731 |
732 | return super().count_tokens(text, model_name)
733 |
734 | def _is_error_retryable(self, error: Exception) -> bool:
735 | """Determine if an error should be retried based on structured error codes.
736 |
737 | Uses OpenAI API error structure instead of text pattern matching for reliability.
738 |
739 | Args:
740 | error: Exception from OpenAI API call
741 |
742 | Returns:
743 | True if error should be retried, False otherwise
744 | """
745 | error_str = str(error).lower()
746 |
747 | # Check for 429 errors first - these need special handling
748 | if "429" in error_str:
749 | # Try to extract structured error information
750 | error_type = None
751 | error_code = None
752 |
753 | # Parse structured error from OpenAI API response
754 | # Format: "Error code: 429 - {'error': {'type': 'tokens', 'code': 'rate_limit_exceeded', ...}}"
755 | try:
756 | import ast
757 | import json
758 | import re
759 |
760 | # Extract JSON part from error string using regex
761 | # Look for pattern: {...} (from first { to last })
762 | json_match = re.search(r"\{.*\}", str(error))
763 | if json_match:
764 | json_like_str = json_match.group(0)
765 |
766 | # First try: parse as Python literal (handles single quotes safely)
767 | try:
768 | error_data = ast.literal_eval(json_like_str)
769 | except (ValueError, SyntaxError):
770 | # Fallback: try JSON parsing with simple quote replacement
771 | # (for cases where it's already valid JSON or simple replacements work)
772 | json_str = json_like_str.replace("'", '"')
773 | error_data = json.loads(json_str)
774 |
775 | if "error" in error_data:
776 | error_info = error_data["error"]
777 | error_type = error_info.get("type")
778 | error_code = error_info.get("code")
779 |
780 | except (json.JSONDecodeError, ValueError, SyntaxError, AttributeError):
781 | # Fall back to checking hasattr for OpenAI SDK exception objects
782 | if hasattr(error, "response") and hasattr(error.response, "json"):
783 | try:
784 | response_data = error.response.json()
785 | if "error" in response_data:
786 | error_info = response_data["error"]
787 | error_type = error_info.get("type")
788 | error_code = error_info.get("code")
789 | except Exception:
790 | pass
791 |
792 | # Determine if 429 is retryable based on structured error codes
793 | if error_type == "tokens":
794 | # Token-related 429s are typically non-retryable (request too large)
795 | logging.debug(f"Non-retryable 429: token-related error (type={error_type}, code={error_code})")
796 | return False
797 | elif error_code in ["invalid_request_error", "context_length_exceeded"]:
798 | # These are permanent failures
799 | logging.debug(f"Non-retryable 429: permanent failure (type={error_type}, code={error_code})")
800 | return False
801 | else:
802 | # Other 429s (like requests per minute) are retryable
803 | logging.debug(f"Retryable 429: rate limiting (type={error_type}, code={error_code})")
804 | return True
805 |
806 | # For non-429 errors, check if they're retryable
807 | retryable_indicators = [
808 | "timeout",
809 | "connection",
810 | "network",
811 | "temporary",
812 | "unavailable",
813 | "retry",
814 | "408", # Request timeout
815 | "500", # Internal server error
816 | "502", # Bad gateway
817 | "503", # Service unavailable
818 | "504", # Gateway timeout
819 | "ssl", # SSL errors
820 | "handshake", # Handshake failures
821 | ]
822 |
823 | return any(indicator in error_str for indicator in retryable_indicators)
824 |
825 | def _process_image(self, image_path: str) -> Optional[dict]:
826 | """Process an image for OpenAI-compatible API."""
827 | try:
828 | if image_path.startswith("data:"):
829 | # Validate the data URL
830 | validate_image(image_path)
831 | # Handle data URL: ...
832 | return {"type": "image_url", "image_url": {"url": image_path}}
833 | else:
834 | # Use base class validation
835 | image_bytes, mime_type = validate_image(image_path)
836 |
837 | # Read and encode the image
838 | import base64
839 |
840 | image_data = base64.b64encode(image_bytes).decode()
841 | logging.debug(f"Processing image '{image_path}' as MIME type '{mime_type}'")
842 |
843 | # Create data URL for OpenAI API
844 | data_url = f"data:{mime_type};base64,{image_data}"
845 |
846 | return {"type": "image_url", "image_url": {"url": data_url}}
847 |
848 | except ValueError as e:
849 | logging.warning(str(e))
850 | return None
851 | except Exception as e:
852 | logging.error(f"Error processing image {image_path}: {e}")
853 | return None
854 |
```
--------------------------------------------------------------------------------
/tools/tracer.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tracer Workflow tool - Step-by-step code tracing and dependency analysis
3 |
4 | This tool provides a structured workflow for comprehensive code tracing and analysis.
5 | It guides the CLI agent through systematic investigation steps with forced pauses between each step
6 | to ensure thorough code examination, dependency mapping, and execution flow analysis before proceeding.
7 |
8 | The tracer guides users through sequential code analysis with full context awareness and
9 | the ability to revise and adapt as understanding deepens.
10 |
11 | Key features:
12 | - Sequential tracing with systematic investigation workflow
13 | - Support for precision tracing (execution flow) and dependencies tracing (structural relationships)
14 | - Self-contained completion with detailed output formatting instructions
15 | - Context-aware analysis that builds understanding step by step
16 | - No external expert analysis needed - provides comprehensive guidance internally
17 |
18 | Perfect for: method/function execution flow analysis, dependency mapping, call chain tracing,
19 | structural relationship analysis, architectural understanding, and code comprehension.
20 | """
21 |
22 | import logging
23 | from typing import TYPE_CHECKING, Any, Literal, Optional
24 |
25 | from pydantic import Field, field_validator
26 |
27 | if TYPE_CHECKING:
28 | from tools.models import ToolModelCategory
29 |
30 | from config import TEMPERATURE_ANALYTICAL
31 | from systemprompts import TRACER_PROMPT
32 | from tools.shared.base_models import WorkflowRequest
33 |
34 | from .workflow.base import WorkflowTool
35 |
36 | logger = logging.getLogger(__name__)
37 |
38 | # Tool-specific field descriptions for tracer workflow
39 | TRACER_WORKFLOW_FIELD_DESCRIPTIONS = {
40 | "step": (
41 | "The plan for the current tracing step. Step 1: State the tracing strategy. Later steps: Report findings and adapt the plan. "
42 | "CRITICAL: For 'precision' mode, focus on execution flow and call chains. For 'dependencies' mode, focus on structural relationships. "
43 | "If trace_mode is 'ask' in step 1, you MUST prompt the user to choose a mode."
44 | ),
45 | "step_number": (
46 | "The index of the current step in the tracing sequence, beginning at 1. Each step should build upon or "
47 | "revise the previous one."
48 | ),
49 | "total_steps": (
50 | "Your current estimate for how many steps will be needed to complete the tracing analysis. "
51 | "Adjust as new findings emerge."
52 | ),
53 | "next_step_required": (
54 | "Set to true if you plan to continue the investigation with another step. False means you believe the "
55 | "tracing analysis is complete and ready for final output formatting."
56 | ),
57 | "findings": (
58 | "Summary of discoveries from this step, including execution paths, dependency relationships, call chains, and structural patterns. "
59 | "IMPORTANT: Document both direct (immediate calls) and indirect (transitive, side effects) relationships."
60 | ),
61 | "files_checked": (
62 | "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
63 | ),
64 | "relevant_files": (
65 | "Subset of files_checked directly relevant to the tracing target (absolute paths). Include implementation files, "
66 | "dependencies, or files demonstrating key relationships."
67 | ),
68 | "relevant_context": (
69 | "List methods/functions central to the tracing analysis, in 'ClassName.methodName' or 'functionName' format. "
70 | "Prioritize those in the execution flow or dependency chain."
71 | ),
72 | "confidence": (
73 | "Your confidence in the tracing analysis. Use: 'exploring', 'low', 'medium', 'high', 'very_high', 'almost_certain', 'certain'. "
74 | "CRITICAL: 'certain' implies the analysis is 100% complete locally and PREVENTS external model validation."
75 | ),
76 | "trace_mode": "Type of tracing: 'ask' (default - prompts user to choose mode), 'precision' (execution flow) or 'dependencies' (structural relationships)",
77 | "target_description": (
78 | "Description of what to trace and WHY. Include context about what you're trying to understand or analyze."
79 | ),
80 | "images": ("Optional paths to architecture diagrams or flow charts that help understand the tracing context."),
81 | }
82 |
83 |
84 | class TracerRequest(WorkflowRequest):
85 | """Request model for tracer workflow investigation steps"""
86 |
87 | # Required fields for each investigation step
88 | step: str = Field(..., description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["step"])
89 | step_number: int = Field(..., description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
90 | total_steps: int = Field(..., description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
91 | next_step_required: bool = Field(..., description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
92 |
93 | # Investigation tracking fields
94 | findings: str = Field(..., description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
95 | files_checked: list[str] = Field(
96 | default_factory=list, description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
97 | )
98 | relevant_files: list[str] = Field(
99 | default_factory=list, description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
100 | )
101 | relevant_context: list[str] = Field(
102 | default_factory=list, description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
103 | )
104 | confidence: Optional[str] = Field("exploring", description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["confidence"])
105 |
106 | # Tracer-specific fields (used in step 1 to initialize)
107 | trace_mode: Optional[Literal["precision", "dependencies", "ask"]] = Field(
108 | "ask", description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["trace_mode"]
109 | )
110 | target_description: Optional[str] = Field(
111 | None, description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["target_description"]
112 | )
113 | images: Optional[list[str]] = Field(default=None, description=TRACER_WORKFLOW_FIELD_DESCRIPTIONS["images"])
114 |
115 | # Exclude fields not relevant to tracing workflow
116 | issues_found: list[dict] = Field(default_factory=list, exclude=True, description="Tracing doesn't track issues")
117 | hypothesis: Optional[str] = Field(default=None, exclude=True, description="Tracing doesn't use hypothesis")
118 | # Exclude other non-tracing fields
119 | temperature: Optional[float] = Field(default=None, exclude=True)
120 | thinking_mode: Optional[str] = Field(default=None, exclude=True)
121 | use_assistant_model: Optional[bool] = Field(default=False, exclude=True, description="Tracing is self-contained")
122 |
123 | @field_validator("step_number")
124 | @classmethod
125 | def validate_step_number(cls, v):
126 | if v < 1:
127 | raise ValueError("step_number must be at least 1")
128 | return v
129 |
130 | @field_validator("total_steps")
131 | @classmethod
132 | def validate_total_steps(cls, v):
133 | if v < 1:
134 | raise ValueError("total_steps must be at least 1")
135 | return v
136 |
137 |
138 | class TracerTool(WorkflowTool):
139 | """
140 | Tracer workflow tool for step-by-step code tracing and dependency analysis.
141 |
142 | This tool implements a structured tracing workflow that guides users through
143 | methodical investigation steps, ensuring thorough code examination, dependency
144 | mapping, and execution flow analysis before reaching conclusions. It supports
145 | both precision tracing (execution flow) and dependencies tracing (structural relationships).
146 | """
147 |
148 | def __init__(self):
149 | super().__init__()
150 | self.initial_request = None
151 | self.trace_config = {}
152 |
153 | def get_name(self) -> str:
154 | return "tracer"
155 |
156 | def get_description(self) -> str:
157 | return (
158 | "Performs systematic code tracing with modes for execution flow or dependency mapping. "
159 | "Use for method execution analysis, call chain tracing, dependency mapping, and architectural understanding. "
160 | "Supports precision mode (execution flow) and dependencies mode (structural relationships)."
161 | )
162 |
163 | def get_system_prompt(self) -> str:
164 | return TRACER_PROMPT
165 |
166 | def get_default_temperature(self) -> float:
167 | return TEMPERATURE_ANALYTICAL
168 |
169 | def get_model_category(self) -> "ToolModelCategory":
170 | """Tracer requires analytical reasoning for code analysis"""
171 | from tools.models import ToolModelCategory
172 |
173 | return ToolModelCategory.EXTENDED_REASONING
174 |
175 | def requires_model(self) -> bool:
176 | """
177 | Tracer tool doesn't require model resolution at the MCP boundary.
178 |
179 | The tracer is a structured workflow tool that organizes tracing steps
180 | and provides detailed output formatting guidance without calling external AI models.
181 |
182 | Returns:
183 | bool: False - tracer doesn't need AI model access
184 | """
185 | return False
186 |
187 | def get_workflow_request_model(self):
188 | """Return the tracer-specific request model."""
189 | return TracerRequest
190 |
191 | def get_tool_fields(self) -> dict[str, dict[str, Any]]:
192 | """Return tracing-specific field definitions beyond the standard workflow fields."""
193 | return {
194 | # Tracer-specific fields
195 | "trace_mode": {
196 | "type": "string",
197 | "enum": ["precision", "dependencies", "ask"],
198 | "description": TRACER_WORKFLOW_FIELD_DESCRIPTIONS["trace_mode"],
199 | },
200 | "target_description": {
201 | "type": "string",
202 | "description": TRACER_WORKFLOW_FIELD_DESCRIPTIONS["target_description"],
203 | },
204 | "images": {
205 | "type": "array",
206 | "items": {"type": "string"},
207 | "description": TRACER_WORKFLOW_FIELD_DESCRIPTIONS["images"],
208 | },
209 | }
210 |
211 | def get_input_schema(self) -> dict[str, Any]:
212 | """Generate input schema using WorkflowSchemaBuilder with field exclusion."""
213 | from .workflow.schema_builders import WorkflowSchemaBuilder
214 |
215 | # Exclude investigation-specific fields that tracing doesn't need
216 | excluded_workflow_fields = [
217 | "issues_found", # Tracing doesn't track issues
218 | "hypothesis", # Tracing doesn't use hypothesis
219 | ]
220 |
221 | # Exclude common fields that tracing doesn't need
222 | excluded_common_fields = [
223 | "temperature", # Tracing doesn't need temperature control
224 | "thinking_mode", # Tracing doesn't need thinking mode
225 | "absolute_file_paths", # Tracing uses relevant_files instead
226 | ]
227 |
228 | return WorkflowSchemaBuilder.build_schema(
229 | tool_specific_fields=self.get_tool_fields(),
230 | required_fields=["target_description", "trace_mode"], # Step 1 requires these
231 | model_field_schema=self.get_model_field_schema(),
232 | auto_mode=self.is_effective_auto_mode(),
233 | tool_name=self.get_name(),
234 | excluded_workflow_fields=excluded_workflow_fields,
235 | excluded_common_fields=excluded_common_fields,
236 | )
237 |
238 | # ================================================================================
239 | # Abstract Methods - Required Implementation from BaseWorkflowMixin
240 | # ================================================================================
241 |
242 | def get_required_actions(
243 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
244 | ) -> list[str]:
245 | """Define required actions for each tracing phase."""
246 | if step_number == 1:
247 | # Check if we're in ask mode and need to prompt for mode selection
248 | if self.get_trace_mode() == "ask":
249 | return [
250 | "MUST ask user to choose between precision or dependencies mode",
251 | "Explain precision mode: traces execution flow, call chains, and usage patterns (best for methods/functions)",
252 | "Explain dependencies mode: maps structural relationships and bidirectional dependencies (best for classes/modules)",
253 | "Wait for user's mode selection before proceeding with investigation",
254 | ]
255 |
256 | # Initial tracing investigation tasks (when mode is already selected)
257 | return [
258 | "Search for and locate the target method/function/class/module in the codebase",
259 | "Read and understand the implementation of the target code",
260 | "Identify the file location, complete signature, and basic structure",
261 | "Begin mapping immediate relationships (what it calls, what calls it)",
262 | "Understand the context and purpose of the target code",
263 | ]
264 | elif confidence in ["exploring", "low"]:
265 | # Need deeper investigation
266 | return [
267 | "Trace deeper into the execution flow or dependency relationships",
268 | "Examine how the target code is used throughout the codebase",
269 | "Map additional layers of dependencies or call chains",
270 | "Look for conditional execution paths, error handling, and edge cases",
271 | "Understand the broader architectural context and patterns",
272 | ]
273 | elif confidence in ["medium", "high"]:
274 | # Close to completion - need final verification
275 | return [
276 | "Verify completeness of the traced relationships and execution paths",
277 | "Check for any missed dependencies, usage patterns, or execution branches",
278 | "Confirm understanding of side effects, state changes, and external interactions",
279 | "Validate that the tracing covers all significant code relationships",
280 | "Prepare comprehensive findings for final output formatting",
281 | ]
282 | else:
283 | # General investigation needed
284 | return [
285 | "Continue systematic tracing of code relationships and execution paths",
286 | "Gather more evidence using appropriate code analysis techniques",
287 | "Test assumptions about code behavior and dependency relationships",
288 | "Look for patterns that enhance understanding of the code structure",
289 | "Focus on areas that haven't been thoroughly traced yet",
290 | ]
291 |
292 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
293 | """Tracer is self-contained and doesn't need expert analysis."""
294 | return False
295 |
296 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
297 | """Tracer doesn't use expert analysis."""
298 | return ""
299 |
300 | def requires_expert_analysis(self) -> bool:
301 | """Tracer is self-contained like the planner tool."""
302 | return False
303 |
304 | # ================================================================================
305 | # Workflow Customization - Match Planner Behavior
306 | # ================================================================================
307 |
308 | def prepare_step_data(self, request) -> dict:
309 | """
310 | Prepare step data from request with tracer-specific fields.
311 | """
312 | step_data = {
313 | "step": request.step,
314 | "step_number": request.step_number,
315 | "findings": request.findings,
316 | "files_checked": request.files_checked,
317 | "relevant_files": request.relevant_files,
318 | "relevant_context": request.relevant_context,
319 | "issues_found": [], # Tracer doesn't track issues
320 | "confidence": request.confidence or "exploring",
321 | "hypothesis": None, # Tracer doesn't use hypothesis
322 | "images": request.images or [],
323 | # Tracer-specific fields
324 | "trace_mode": request.trace_mode,
325 | "target_description": request.target_description,
326 | }
327 | return step_data
328 |
329 | def build_base_response(self, request, continuation_id: str = None) -> dict:
330 | """
331 | Build the base response structure with tracer-specific fields.
332 | """
333 | # Use work_history from workflow mixin for consistent step tracking
334 | current_step_count = len(self.work_history) + 1
335 |
336 | response_data = {
337 | "status": f"{self.get_name()}_in_progress",
338 | "step_number": request.step_number,
339 | "total_steps": request.total_steps,
340 | "next_step_required": request.next_step_required,
341 | "step_content": request.step,
342 | f"{self.get_name()}_status": {
343 | "files_checked": len(self.consolidated_findings.files_checked),
344 | "relevant_files": len(self.consolidated_findings.relevant_files),
345 | "relevant_context": len(self.consolidated_findings.relevant_context),
346 | "issues_found": len(self.consolidated_findings.issues_found),
347 | "images_collected": len(self.consolidated_findings.images),
348 | "current_confidence": self.get_request_confidence(request),
349 | "step_history_length": current_step_count,
350 | },
351 | "metadata": {
352 | "trace_mode": self.trace_config.get("trace_mode", "unknown"),
353 | "target_description": self.trace_config.get("target_description", ""),
354 | "step_history_length": current_step_count,
355 | },
356 | }
357 |
358 | if continuation_id:
359 | response_data["continuation_id"] = continuation_id
360 |
361 | return response_data
362 |
363 | def handle_work_continuation(self, response_data: dict, request) -> dict:
364 | """
365 | Handle work continuation with tracer-specific guidance.
366 | """
367 | response_data["status"] = f"pause_for_{self.get_name()}"
368 | response_data[f"{self.get_name()}_required"] = True
369 |
370 | # Get tracer-specific required actions
371 | required_actions = self.get_required_actions(
372 | request.step_number, request.confidence or "exploring", request.findings, request.total_steps
373 | )
374 | response_data["required_actions"] = required_actions
375 |
376 | # Generate step-specific guidance
377 | if request.step_number == 1:
378 | # Check if we're in ask mode and need to prompt for mode selection
379 | if self.get_trace_mode() == "ask":
380 | response_data["next_steps"] = (
381 | f"STOP! You MUST ask the user to choose a tracing mode before proceeding. "
382 | f"Present these options clearly:\\n\\n"
383 | f"**PRECISION MODE**: Traces execution flow, call chains, and usage patterns. "
384 | f"Best for understanding how a specific method or function works, what it calls, "
385 | f"and how data flows through the execution path.\\n\\n"
386 | f"**DEPENDENCIES MODE**: Maps structural relationships and bidirectional dependencies. "
387 | f"Best for understanding how a class or module relates to other components, "
388 | f"what depends on it, and what it depends on.\\n\\n"
389 | f"After the user selects a mode, call {self.get_name()} again with step_number: 1 "
390 | f"but with the chosen trace_mode (either 'precision' or 'dependencies')."
391 | )
392 | else:
393 | response_data["next_steps"] = (
394 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first investigate "
395 | f"the codebase to understand the target code. CRITICAL AWARENESS: You need to find and understand "
396 | f"the target method/function/class/module, examine its implementation, and begin mapping its "
397 | f"relationships. Use file reading tools, code search, and systematic examination to gather "
398 | f"comprehensive information about the target. Only call {self.get_name()} again AFTER completing "
399 | f"your investigation. When you call {self.get_name()} next time, use step_number: {request.step_number + 1} "
400 | f"and report specific files examined, code structure discovered, and initial relationship findings."
401 | )
402 | elif request.confidence in ["exploring", "low"]:
403 | next_step = request.step_number + 1
404 | response_data["next_steps"] = (
405 | f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
406 | f"deeper tracing analysis. MANDATORY ACTIONS before calling {self.get_name()} step {next_step}:\\n"
407 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
408 | + f"\\n\\nOnly call {self.get_name()} again with step_number: {next_step} AFTER "
409 | + "completing these tracing investigations."
410 | )
411 | elif request.confidence in ["medium", "high"]:
412 | next_step = request.step_number + 1
413 | response_data["next_steps"] = (
414 | f"WAIT! Your tracing analysis needs final verification. DO NOT call {self.get_name()} immediately. "
415 | f"REQUIRED ACTIONS:\\n"
416 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
417 | + f"\\n\\nREMEMBER: Ensure you have traced all significant relationships and execution paths. "
418 | f"Document findings with specific file references and method signatures, then call {self.get_name()} "
419 | f"with step_number: {next_step}."
420 | )
421 | else:
422 | # General investigation needed
423 | next_step = request.step_number + 1
424 | remaining_steps = request.total_steps - request.step_number
425 | response_data["next_steps"] = (
426 | f"Continue systematic tracing with step {next_step}. Approximately {remaining_steps} steps remaining. "
427 | f"Focus on deepening your understanding of the code relationships and execution patterns."
428 | )
429 |
430 | return response_data
431 |
432 | def customize_workflow_response(self, response_data: dict, request) -> dict:
433 | """
434 | Customize response to match tracer tool format with output instructions.
435 | """
436 | # Store trace configuration on first step
437 | if request.step_number == 1:
438 | self.initial_request = request.step
439 | self.trace_config = {
440 | "trace_mode": request.trace_mode,
441 | "target_description": request.target_description,
442 | }
443 |
444 | # Update metadata with trace configuration
445 | if "metadata" in response_data:
446 | response_data["metadata"]["trace_mode"] = request.trace_mode or "unknown"
447 | response_data["metadata"]["target_description"] = request.target_description or ""
448 |
449 | # If in ask mode, mark this as mode selection phase
450 | if request.trace_mode == "ask":
451 | response_data["mode_selection_required"] = True
452 | response_data["status"] = "mode_selection_required"
453 |
454 | # Add tracer-specific output instructions for final steps
455 | if not request.next_step_required:
456 | response_data["tracing_complete"] = True
457 | response_data["trace_summary"] = f"TRACING COMPLETE: {request.step}"
458 |
459 | # Get mode-specific output instructions
460 | trace_mode = self.trace_config.get("trace_mode", "precision")
461 | rendering_instructions = self._get_rendering_instructions(trace_mode)
462 |
463 | response_data["output"] = {
464 | "instructions": (
465 | "This is a structured tracing analysis response. Present the comprehensive tracing findings "
466 | "using the specific rendering format for the trace mode. Follow the exact formatting guidelines "
467 | "provided in rendering_instructions. Include all discovered relationships, execution paths, "
468 | "and dependencies with precise file references and line numbers."
469 | ),
470 | "format": f"{trace_mode}_trace_analysis",
471 | "rendering_instructions": rendering_instructions,
472 | "presentation_guidelines": {
473 | "completed_trace": (
474 | "Use the exact rendering format specified for the trace mode. Include comprehensive "
475 | "diagrams, tables, and structured analysis. Reference specific file paths and line numbers. "
476 | "Follow formatting rules precisely."
477 | ),
478 | "step_content": "Present as main analysis with clear structure and actionable insights.",
479 | "continuation": "Use continuation_id for related tracing sessions or follow-up analysis",
480 | },
481 | }
482 | response_data["next_steps"] = (
483 | f"Tracing analysis complete. Present the comprehensive {trace_mode} trace analysis to the user "
484 | f"using the exact rendering format specified in the output instructions. Follow the formatting "
485 | f"guidelines precisely, including diagrams, tables, and file references. After presenting the "
486 | f"analysis, offer to help with related tracing tasks or use the continuation_id for follow-up analysis."
487 | )
488 |
489 | # Convert generic status names to tracer-specific ones
490 | tool_name = self.get_name()
491 | status_mapping = {
492 | f"{tool_name}_in_progress": "tracing_in_progress",
493 | f"pause_for_{tool_name}": "pause_for_tracing",
494 | f"{tool_name}_required": "tracing_required",
495 | f"{tool_name}_complete": "tracing_complete",
496 | }
497 |
498 | if response_data["status"] in status_mapping:
499 | response_data["status"] = status_mapping[response_data["status"]]
500 |
501 | return response_data
502 |
503 | def _get_rendering_instructions(self, trace_mode: str) -> str:
504 | """
505 | Get mode-specific rendering instructions for the CLI agent.
506 |
507 | Args:
508 | trace_mode: Either "precision" or "dependencies"
509 |
510 | Returns:
511 | str: Complete rendering instructions for the specified mode
512 | """
513 | if trace_mode == "precision":
514 | return self._get_precision_rendering_instructions()
515 | else: # dependencies mode
516 | return self._get_dependencies_rendering_instructions()
517 |
518 | def _get_precision_rendering_instructions(self) -> str:
519 | """Get rendering instructions for precision trace mode."""
520 | return """
521 | ## MANDATORY RENDERING INSTRUCTIONS FOR PRECISION TRACE
522 |
523 | You MUST render the trace analysis using ONLY the Vertical Indented Flow Style:
524 |
525 | ### CALL FLOW DIAGRAM - Vertical Indented Style
526 |
527 | **EXACT FORMAT TO FOLLOW:**
528 | ```
529 | [ClassName::MethodName] (file: /complete/file/path.ext, line: ##)
530 | ↓
531 | [AnotherClass::calledMethod] (file: /path/to/file.ext, line: ##)
532 | ↓
533 | [ThirdClass::nestedMethod] (file: /path/file.ext, line: ##)
534 | ↓
535 | [DeeperClass::innerCall] (file: /path/inner.ext, line: ##) ? if some_condition
536 | ↓
537 | [ServiceClass::processData] (file: /services/service.ext, line: ##)
538 | ↓
539 | [RepositoryClass::saveData] (file: /data/repo.ext, line: ##)
540 | ↓
541 | [ClientClass::sendRequest] (file: /clients/client.ext, line: ##)
542 | ↓
543 | [EmailService::sendEmail] (file: /email/service.ext, line: ##) ⚠️ ambiguous branch
544 | →
545 | [SMSService::sendSMS] (file: /sms/service.ext, line: ##) ⚠️ ambiguous branch
546 | ```
547 |
548 | **CRITICAL FORMATTING RULES:**
549 |
550 | 1. **Method Names**: Use the actual naming convention of the project language you're analyzing. Automatically detect and adapt to the project's conventions (camelCase, snake_case, PascalCase, etc.) based on the codebase structure and file extensions.
551 |
552 | 2. **Vertical Flow Arrows**:
553 | - Use `↓` for standard sequential calls (vertical flow)
554 | - Use `→` for parallel/alternative calls (horizontal branch)
555 | - NEVER use other arrow types
556 |
557 | 3. **Indentation Logic**:
558 | - Start at column 0 for entry point
559 | - Indent 2 spaces for each nesting level
560 | - Maintain consistent indentation for same call depth
561 | - Sibling calls at same level should have same indentation
562 |
563 | 4. **Conditional Calls**:
564 | - Add `? if condition_description` after method for conditional execution
565 | - Use actual condition names from code when possible
566 |
567 | 5. **Ambiguous Branches**:
568 | - Mark with `⚠️ ambiguous branch` when execution path is uncertain
569 | - Use `→` to show alternative paths at same indentation level
570 |
571 | 6. **File Path Format**:
572 | - Use complete relative paths from project root
573 | - Include actual file extensions from the project
574 | - Show exact line numbers where method is defined
575 |
576 | ### ADDITIONAL ANALYSIS VIEWS
577 |
578 | **1. BRANCHING & SIDE EFFECT TABLE**
579 |
580 | | Location | Condition | Branches | Uncertain |
581 | |----------|-----------|----------|-----------|
582 | | CompleteFileName.ext:## | if actual_condition_from_code | method1(), method2(), else skip | No |
583 | | AnotherFile.ext:## | if boolean_check | callMethod(), else return | No |
584 | | ThirdFile.ext:## | if validation_passes | processData(), else throw | Yes |
585 |
586 | **2. SIDE EFFECTS**
587 | ```
588 | Side Effects:
589 | - [database] Specific database operation description (CompleteFileName.ext:##)
590 | - [network] Specific network call description (CompleteFileName.ext:##)
591 | - [filesystem] Specific file operation description (CompleteFileName.ext:##)
592 | - [state] State changes or property modifications (CompleteFileName.ext:##)
593 | - [memory] Memory allocation or cache operations (CompleteFileName.ext:##)
594 | ```
595 |
596 | **3. USAGE POINTS**
597 | ```
598 | Usage Points:
599 | 1. FileName.ext:## - Context description of where/why it's called
600 | 2. AnotherFile.ext:## - Context description of usage scenario
601 | 3. ThirdFile.ext:## - Context description of calling pattern
602 | 4. FourthFile.ext:## - Context description of integration point
603 | ```
604 |
605 | **4. ENTRY POINTS**
606 | ```
607 | Entry Points:
608 | - ClassName::methodName (context: where this flow typically starts)
609 | - AnotherClass::entryMethod (context: alternative entry scenario)
610 | - ThirdClass::triggerMethod (context: event-driven entry point)
611 | ```
612 |
613 | **ABSOLUTE REQUIREMENTS:**
614 | - Use ONLY the vertical indented style for the call flow diagram
615 | - Present ALL FOUR additional analysis views (Branching Table, Side Effects, Usage Points, Entry Points)
616 | - Adapt method naming to match the project's programming language conventions
617 | - Use exact file paths and line numbers from the actual codebase
618 | - DO NOT invent or guess method names or locations
619 | - Follow indentation rules precisely for call hierarchy
620 | - Mark uncertain execution paths clearly
621 | - Provide contextual descriptions in Usage Points and Entry Points sections
622 | - Include comprehensive side effects categorization (database, network, filesystem, state, memory)"""
623 |
624 | def _get_dependencies_rendering_instructions(self) -> str:
625 | """Get rendering instructions for dependencies trace mode."""
626 | return """
627 | ## MANDATORY RENDERING INSTRUCTIONS FOR DEPENDENCIES TRACE
628 |
629 | You MUST render the trace analysis using ONLY the Bidirectional Arrow Flow Style:
630 |
631 | ### DEPENDENCY FLOW DIAGRAM - Bidirectional Arrow Style
632 |
633 | **EXACT FORMAT TO FOLLOW:**
634 | ```
635 | INCOMING DEPENDENCIES → [TARGET_CLASS/MODULE] → OUTGOING DEPENDENCIES
636 |
637 | CallerClass::callerMethod ←────┐
638 | AnotherCaller::anotherMethod ←─┤
639 | ThirdCaller::thirdMethod ←─────┤
640 | │
641 | [TARGET_CLASS/MODULE]
642 | │
643 | ├────→ FirstDependency::method
644 | ├────→ SecondDependency::method
645 | └────→ ThirdDependency::method
646 |
647 | TYPE RELATIONSHIPS:
648 | InterfaceName ──implements──→ [TARGET_CLASS] ──extends──→ BaseClass
649 | DTOClass ──uses──→ [TARGET_CLASS] ──uses──→ EntityClass
650 | ```
651 |
652 | **CRITICAL FORMATTING RULES:**
653 |
654 | 1. **Target Placement**: Always place the target class/module in square brackets `[TARGET_NAME]` at the center
655 | 2. **Incoming Dependencies**: Show on the left side with `←` arrows pointing INTO the target
656 | 3. **Outgoing Dependencies**: Show on the right side with `→` arrows pointing OUT FROM the target
657 | 4. **Arrow Alignment**: Use consistent spacing and alignment for visual clarity
658 | 5. **Method Naming**: Use the project's actual naming conventions detected from the codebase
659 | 6. **File References**: Include complete file paths and line numbers
660 |
661 | **VISUAL LAYOUT RULES:**
662 |
663 | 1. **Header Format**: Always start with the flow direction indicator
664 | 2. **Left Side (Incoming)**:
665 | - List all callers with `←` arrows
666 | - Use `┐`, `┤`, `┘` box drawing characters for clean connection lines
667 | - Align arrows consistently
668 |
669 | 3. **Center (Target)**:
670 | - Enclose target in square brackets
671 | - Position centrally between incoming and outgoing
672 |
673 | 4. **Right Side (Outgoing)**:
674 | - List all dependencies with `→` arrows
675 | - Use `├`, `└` box drawing characters for branching
676 | - Maintain consistent spacing
677 |
678 | 5. **Type Relationships Section**:
679 | - Use `──relationship──→` format with double hyphens
680 | - Show inheritance, implementation, and usage relationships
681 | - Place below the main flow diagram
682 |
683 | **DEPENDENCY TABLE:**
684 |
685 | | Type | From/To | Method | File | Line |
686 | |------|---------|--------|------|------|
687 | | incoming_call | From: CallerClass | callerMethod | /complete/path/file.ext | ## |
688 | | outgoing_call | To: TargetClass | targetMethod | /complete/path/file.ext | ## |
689 | | implements | Self: ThisClass | — | /complete/path/file.ext | — |
690 | | extends | Self: ThisClass | — | /complete/path/file.ext | — |
691 | | uses_type | Self: ThisClass | — | /complete/path/file.ext | — |
692 |
693 | **ABSOLUTE REQUIREMENTS:**
694 | - Use ONLY the bidirectional arrow flow style shown above
695 | - Automatically detect and use the project's naming conventions
696 | - Use exact file paths and line numbers from the actual codebase
697 | - DO NOT invent or guess method/class names
698 | - Maintain visual alignment and consistent spacing
699 | - Include type relationships section when applicable
700 | - Show clear directional flow with proper arrows"""
701 |
702 | # ================================================================================
703 | # Hook Method Overrides for Tracer-Specific Behavior
704 | # ================================================================================
705 |
706 | def get_completion_status(self) -> str:
707 | """Tracer uses tracing-specific status."""
708 | return "tracing_complete"
709 |
710 | def get_completion_data_key(self) -> str:
711 | """Tracer uses 'complete_tracing' key."""
712 | return "complete_tracing"
713 |
714 | def get_completion_message(self) -> str:
715 | """Tracer-specific completion message."""
716 | return (
717 | "Tracing analysis complete. Present the comprehensive trace analysis to the user "
718 | "using the specified rendering format and offer to help with related tracing tasks."
719 | )
720 |
721 | def get_skip_reason(self) -> str:
722 | """Tracer-specific skip reason."""
723 | return "Tracer is self-contained and completes analysis without external assistance"
724 |
725 | def get_skip_expert_analysis_status(self) -> str:
726 | """Tracer-specific expert analysis skip status."""
727 | return "skipped_by_tool_design"
728 |
729 | def store_initial_issue(self, step_description: str):
730 | """Store initial tracing description."""
731 | self.initial_tracing_description = step_description
732 |
733 | def get_initial_request(self, fallback_step: str) -> str:
734 | """Get initial tracing description."""
735 | try:
736 | return self.initial_tracing_description
737 | except AttributeError:
738 | return fallback_step
739 |
740 | def get_request_confidence(self, request) -> str:
741 | """Get confidence from request for tracer workflow."""
742 | try:
743 | return request.confidence or "exploring"
744 | except AttributeError:
745 | return "exploring"
746 |
747 | def get_trace_mode(self) -> str:
748 | """Get current trace mode. Override for custom trace mode handling."""
749 | try:
750 | return self.trace_config.get("trace_mode", "ask")
751 | except AttributeError:
752 | return "ask"
753 |
754 | # Required abstract methods from BaseTool
755 | def get_request_model(self):
756 | """Return the tracer-specific request model."""
757 | return TracerRequest
758 |
759 | async def prepare_prompt(self, request) -> str:
760 | """Not used - workflow tools use execute_workflow()."""
761 | return "" # Workflow tools use execute_workflow() directly
762 |
```