This is page 21 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/simulator_tests/test_analyze_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Analyze Tool Validation Test
4 |
5 | Tests the analyze tool's capabilities using the new workflow architecture.
6 | This validates that the new workflow-based implementation provides step-by-step
7 | analysis with expert validation following the same patterns as debug/codereview tools.
8 | """
9 |
10 | import json
11 | from typing import Optional
12 |
13 | from .conversation_base_test import ConversationBaseTest
14 |
15 |
16 | class AnalyzeValidationTest(ConversationBaseTest):
17 | """Test analyze tool with new workflow architecture"""
18 |
19 | @property
20 | def test_name(self) -> str:
21 | return "analyze_validation"
22 |
23 | @property
24 | def test_description(self) -> str:
25 | return "AnalyzeWorkflow tool validation with new workflow architecture"
26 |
27 | def run_test(self) -> bool:
28 | """Test analyze tool capabilities"""
29 | # Set up the test environment
30 | self.setUp()
31 |
32 | try:
33 | self.logger.info("Test: AnalyzeWorkflow tool validation (new architecture)")
34 |
35 | # Create test files for analysis
36 | self._create_analysis_codebase()
37 |
38 | # Test 1: Single analysis session with multiple steps
39 | if not self._test_single_analysis_session():
40 | return False
41 |
42 | # Test 2: Analysis flow that requires refocusing
43 | if not self._test_analysis_refocus_flow():
44 | return False
45 |
46 | # Test 3: Complete analysis with expert validation
47 | if not self._test_complete_analysis_with_expert():
48 | return False
49 |
50 | # Test 4: Certain confidence behavior
51 | if not self._test_certain_confidence():
52 | return False
53 |
54 | # Test 5: Context-aware file embedding
55 | if not self._test_context_aware_file_embedding():
56 | return False
57 |
58 | # Test 6: Different analysis types
59 | if not self._test_analysis_types():
60 | return False
61 |
62 | self.logger.info(" ✅ All analyze validation tests passed")
63 | return True
64 |
65 | except Exception as e:
66 | self.logger.error(f"AnalyzeWorkflow validation test failed: {e}")
67 | return False
68 |
69 | def _create_analysis_codebase(self):
70 | """Create test files representing a realistic codebase for analysis"""
71 | # Create a Python microservice with various architectural patterns
72 | main_service = """#!/usr/bin/env python3
73 | import asyncio
74 | import json
75 | from datetime import datetime
76 | from typing import Dict, List, Optional
77 |
78 | from fastapi import FastAPI, HTTPException, Depends
79 | from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
80 | from sqlalchemy.orm import sessionmaker
81 | import redis
82 | import logging
83 |
84 | # Global configurations - could be improved
85 | DATABASE_URL = "postgresql://user:pass@localhost/db"
86 | REDIS_URL = "redis://localhost:6379"
87 |
88 | app = FastAPI(title="User Management Service")
89 |
90 | # Database setup
91 | engine = create_async_engine(DATABASE_URL, echo=True)
92 | AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
93 |
94 | # Redis connection - potential singleton pattern issue
95 | redis_client = redis.Redis.from_url(REDIS_URL)
96 |
97 | class UserService:
98 | def __init__(self, db: AsyncSession):
99 | self.db = db
100 | self.cache = redis_client # Direct dependency on global
101 |
102 | async def get_user(self, user_id: int) -> Optional[Dict]:
103 | # Cache key generation - could be centralized
104 | cache_key = f"user:{user_id}"
105 |
106 | # Check cache first
107 | cached = self.cache.get(cache_key)
108 | if cached:
109 | return json.loads(cached)
110 |
111 | # Database query - no error handling
112 | result = await self.db.execute(
113 | "SELECT * FROM users WHERE id = %s", (user_id,)
114 | )
115 | user_data = result.fetchone() if user_data:
116 | # Cache for 1 hour - magic number
117 | self.cache.setex(cache_key, 3600, json.dumps(user_data, ensure_ascii=False))
118 |
119 | return user_data
120 |
121 | async def create_user(self, user_data: Dict) -> Dict:
122 | # Input validation missing
123 | # No transaction handling
124 | # No audit logging
125 |
126 | query = "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id"
127 | result = await self.db.execute(query, (user_data['name'], user_data['email']))
128 | user_id = result.fetchone()[0]
129 |
130 | # Cache invalidation strategy missing
131 |
132 | return {"id": user_id, **user_data}
133 |
134 | @app.get("/users/{user_id}")
135 | async def get_user_endpoint(user_id: int, db: AsyncSession = Depends(get_db)):
136 | service = UserService(db)
137 | user = await service.get_user(user_id)
138 |
139 | if not user:
140 | raise HTTPException(status_code=404, detail="User not found")
141 |
142 | return user
143 |
144 | @app.post("/users")
145 | async def create_user_endpoint(user_data: dict, db: AsyncSession = Depends(get_db)):
146 | service = UserService(db)
147 | return await service.create_user(user_data)
148 |
149 | async def get_db():
150 | async with AsyncSessionLocal() as session:
151 | yield session
152 | """
153 |
154 | # Create config module with various architectural concerns
155 | config_module = """#!/usr/bin/env python3
156 | import os
157 | from dataclasses import dataclass
158 | from typing import Optional
159 |
160 | # Configuration approach could be improved
161 | @dataclass
162 | class DatabaseConfig:
163 | url: str = os.getenv("DATABASE_URL", "postgresql://localhost/app")
164 | pool_size: int = int(os.getenv("DB_POOL_SIZE", "5"))
165 | max_overflow: int = int(os.getenv("DB_MAX_OVERFLOW", "10"))
166 | echo: bool = os.getenv("DB_ECHO", "false").lower() == "true"
167 |
168 | @dataclass
169 | class CacheConfig:
170 | redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379")
171 | default_ttl: int = int(os.getenv("CACHE_TTL", "3600"))
172 | max_connections: int = int(os.getenv("REDIS_MAX_CONN", "20"))
173 |
174 | @dataclass
175 | class AppConfig:
176 | environment: str = os.getenv("ENVIRONMENT", "development")
177 | debug: bool = os.getenv("DEBUG", "false").lower() == "true"
178 | log_level: str = os.getenv("LOG_LEVEL", "INFO")
179 |
180 | # Nested config objects
181 | database: DatabaseConfig = DatabaseConfig()
182 | cache: CacheConfig = CacheConfig()
183 |
184 | # Security settings scattered
185 | secret_key: str = os.getenv("SECRET_KEY", "dev-key-not-secure")
186 | jwt_algorithm: str = "HS256"
187 | jwt_expiration: int = 86400 # 24 hours
188 |
189 | def __post_init__(self):
190 | # Validation logic could be centralized
191 | if self.environment == "production" and self.secret_key == "dev-key-not-secure":
192 | raise ValueError("Production environment requires secure secret key")
193 |
194 | # Global configuration instance - potential issues
195 | config = AppConfig()
196 |
197 | # Helper functions that could be methods
198 | def get_database_url() -> str:
199 | return config.database.url
200 |
201 | def get_cache_config() -> dict:
202 | return {
203 | "url": config.cache.redis_url,
204 | "ttl": config.cache.default_ttl,
205 | "max_connections": config.cache.max_connections
206 | }
207 |
208 | def is_production() -> bool:
209 | return config.environment == "production"
210 |
211 | def should_enable_debug() -> bool:
212 | return config.debug and not is_production()
213 | """
214 |
215 | # Create models module with database concerns
216 | models_module = """#!/usr/bin/env python3
217 | from datetime import datetime
218 | from typing import Optional, List
219 | from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey, Text
220 | from sqlalchemy.ext.declarative import declarative_base
221 | from sqlalchemy.orm import relationship
222 | import json
223 |
224 | Base = declarative_base()
225 |
226 | class User(Base):
227 | __tablename__ = "users"
228 |
229 | id = Column(Integer, primary_key=True)
230 | email = Column(String(255), unique=True, nullable=False)
231 | name = Column(String(255), nullable=False)
232 | is_active = Column(Boolean, default=True)
233 | created_at = Column(DateTime, default=datetime.utcnow)
234 | updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
235 |
236 | # Relationship could be optimized
237 | profiles = relationship("UserProfile", back_populates="user", lazy="select")
238 | audit_logs = relationship("AuditLog", back_populates="user")
239 |
240 | def to_dict(self) -> dict:
241 | # Serialization logic mixed with model - could be separated
242 | return {
243 | "id": self.id,
244 | "email": self.email,
245 | "name": self.name,
246 | "is_active": self.is_active,
247 | "created_at": self.created_at.isoformat() if self.created_at else None,
248 | "updated_at": self.updated_at.isoformat() if self.updated_at else None
249 | }
250 |
251 | def update_from_dict(self, data: dict):
252 | # Update logic could be more robust
253 | for key, value in data.items():
254 | if hasattr(self, key) and key not in ['id', 'created_at']:
255 | setattr(self, key, value)
256 | self.updated_at = datetime.utcnow()
257 |
258 | class UserProfile(Base):
259 | __tablename__ = "user_profiles"
260 |
261 | id = Column(Integer, primary_key=True)
262 | user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
263 | bio = Column(Text)
264 | avatar_url = Column(String(500))
265 | preferences = Column(Text) # JSON stored as text - could use JSON column
266 |
267 | user = relationship("User", back_populates="profiles")
268 |
269 | def get_preferences(self) -> dict:
270 | # JSON handling could be centralized
271 | try:
272 | return json.loads(self.preferences) if self.preferences else {}
273 | except json.JSONDecodeError:
274 | return {} def set_preferences(self, prefs: dict):
275 | self.preferences = json.dumps(prefs, ensure_ascii=False)
276 |
277 | class AuditLog(Base):
278 | __tablename__ = "audit_logs"
279 |
280 | id = Column(Integer, primary_key=True)
281 | user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
282 | action = Column(String(100), nullable=False)
283 | details = Column(Text) # JSON stored as text
284 | ip_address = Column(String(45)) # IPv6 support
285 | user_agent = Column(Text)
286 | timestamp = Column(DateTime, default=datetime.utcnow)
287 |
288 | user = relationship("User", back_populates="audit_logs")
289 |
290 | @classmethod
291 | def log_action(cls, db_session, user_id: int, action: str, details: dict = None,
292 | ip_address: str = None, user_agent: str = None):
293 | # Factory method pattern - could be improved
294 | log = cls(
295 | user_id=user_id,
296 | action=action,
297 | details=json.dumps(details, ensure_ascii=False) if details else None,
298 | ip_address=ip_address,
299 | user_agent=user_agent
300 | )
301 | db_session.add(log)
302 | return log
303 | """
304 |
305 | # Create utility module with various helper functions
306 | utils_module = """#!/usr/bin/env python3
307 | import hashlib
308 | import secrets
309 | import re
310 | from datetime import datetime, timedelta
311 | from typing import Optional, Dict, Any
312 | import logging
313 |
314 | # Logging setup - could be centralized
315 | logger = logging.getLogger(__name__)
316 |
317 | class ValidationError(Exception):
318 | \"\"\"Custom exception for validation errors\"\"\"
319 | pass
320 |
321 | def validate_email(email: str) -> bool:
322 | # Email validation - could use more robust library
323 | pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
324 | return bool(re.match(pattern, email))
325 |
326 | def validate_password(password: str) -> tuple[bool, str]:
327 | # Password validation rules - could be configurable
328 | if len(password) < 8:
329 | return False, "Password must be at least 8 characters"
330 |
331 | if not re.search(r'[A-Z]', password):
332 | return False, "Password must contain uppercase letter"
333 |
334 | if not re.search(r'[a-z]', password):
335 | return False, "Password must contain lowercase letter"
336 |
337 | if not re.search(r'[0-9]', password):
338 | return False, "Password must contain number"
339 |
340 | return True, "Valid password"
341 |
342 | def hash_password(password: str) -> str:
343 | # Password hashing - could use more secure algorithm
344 | salt = secrets.token_hex(32)
345 | password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
346 | return f"{salt}:{password_hash.hex()}"
347 |
348 | def verify_password(password: str, hashed: str) -> bool:
349 | # Password verification
350 | try:
351 | salt, hash_hex = hashed.split(':', 1)
352 | password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
353 | return password_hash.hex() == hash_hex
354 | except ValueError:
355 | return False
356 |
357 | def generate_cache_key(*args, prefix: str = "", separator: str = ":") -> str:
358 | # Cache key generation - could be more sophisticated
359 | parts = [str(arg) for arg in args if arg is not None]
360 | if prefix:
361 | parts.insert(0, prefix)
362 | return separator.join(parts)
363 |
364 | def parse_datetime(date_string: str) -> Optional[datetime]:
365 | # Date parsing with multiple format support
366 | formats = [
367 | "%Y-%m-%d %H:%M:%S",
368 | "%Y-%m-%dT%H:%M:%S",
369 | "%Y-%m-%dT%H:%M:%S.%f",
370 | "%Y-%m-%d"
371 | ]
372 |
373 | for fmt in formats:
374 | try:
375 | return datetime.strptime(date_string, fmt)
376 | except ValueError:
377 | continue
378 |
379 | logger.warning(f"Unable to parse datetime: {date_string}")
380 | return None
381 |
382 | def calculate_expiry(hours: int = 24) -> datetime:
383 | # Expiry calculation - could be more flexible
384 | return datetime.utcnow() + timedelta(hours=hours)
385 |
386 | def sanitize_input(data: Dict[str, Any]) -> Dict[str, Any]:
387 | # Input sanitization - basic implementation
388 | sanitized = {}
389 |
390 | for key, value in data.items():
391 | if isinstance(value, str):
392 | # Basic HTML/script tag removal
393 | value = re.sub(r'<[^>]*>', '', value)
394 | value = value.strip()
395 |
396 | # Type validation could be more comprehensive
397 | if value is not None and value != "":
398 | sanitized[key] = value
399 |
400 | return sanitized
401 |
402 | def format_response(data: Any, status: str = "success", message: str = None) -> Dict[str, Any]:
403 | # Response formatting - could be more standardized
404 | response = {
405 | "status": status,
406 | "data": data,
407 | "timestamp": datetime.utcnow().isoformat()
408 | }
409 |
410 | if message:
411 | response["message"] = message
412 |
413 | return response
414 |
415 | class PerformanceTimer:
416 | # Performance measurement utility
417 | def __init__(self, name: str):
418 | self.name = name
419 | self.start_time = None
420 |
421 | def __enter__(self):
422 | self.start_time = datetime.now()
423 | return self
424 |
425 | def __exit__(self, exc_type, exc_val, exc_tb):
426 | if self.start_time:
427 | duration = datetime.now() - self.start_time
428 | logger.info(f"Performance: {self.name} took {duration.total_seconds():.3f}s")
429 | """
430 |
431 | # Create test files
432 | self.main_service_file = self.create_additional_test_file("main_service.py", main_service)
433 | self.config_file = self.create_additional_test_file("config.py", config_module)
434 | self.models_file = self.create_additional_test_file("models.py", models_module)
435 | self.utils_file = self.create_additional_test_file("utils.py", utils_module)
436 |
437 | self.logger.info(" ✅ Created test codebase with 4 files for analysis")
438 |
439 | def _test_single_analysis_session(self) -> bool:
440 | """Test a complete analysis session with multiple steps"""
441 | try:
442 | self.logger.info(" 1.1: Testing single analysis session")
443 |
444 | # Step 1: Start analysis
445 | self.logger.info(" 1.1.1: Step 1 - Initial analysis")
446 | response1, continuation_id = self.call_mcp_tool(
447 | "analyze",
448 | {
449 | "step": "I need to analyze this Python microservice codebase for architectural patterns, design decisions, and improvement opportunities. Let me start by examining the overall structure and understanding the technology stack.",
450 | "step_number": 1,
451 | "total_steps": 4,
452 | "next_step_required": True,
453 | "findings": "Starting analysis of FastAPI microservice with PostgreSQL, Redis, and SQLAlchemy. Initial examination shows user management functionality with caching layer.",
454 | "files_checked": [self.main_service_file],
455 | "relevant_files": [self.main_service_file, self.config_file, self.models_file, self.utils_file],
456 | "prompt": "Analyze this microservice architecture for scalability, maintainability, and design patterns",
457 | "analysis_type": "architecture",
458 | },
459 | )
460 |
461 | if not response1 or not continuation_id:
462 | self.logger.error("Failed to get initial analysis response")
463 | return False
464 |
465 | # Parse and validate JSON response
466 | response1_data = self._parse_analyze_response(response1)
467 | if not response1_data:
468 | return False
469 |
470 | # Validate step 1 response structure - expect pause_for_analysis for next_step_required=True
471 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_analysis"):
472 | return False
473 |
474 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
475 |
476 | # Step 2: Deeper examination
477 | self.logger.info(" 1.1.2: Step 2 - Architecture examination")
478 | response2, _ = self.call_mcp_tool(
479 | "analyze",
480 | {
481 | "step": "Now examining the configuration and models modules to understand data architecture and configuration management patterns.",
482 | "step_number": 2,
483 | "total_steps": 4,
484 | "next_step_required": True,
485 | "findings": "Found several architectural concerns: direct Redis dependency in service class, global configuration instance, missing error handling in database operations, and mixed serialization logic in models.",
486 | "files_checked": [self.main_service_file, self.config_file, self.models_file],
487 | "relevant_files": [self.main_service_file, self.config_file, self.models_file],
488 | "relevant_context": ["UserService", "AppConfig", "User.to_dict"],
489 | "issues_found": [
490 | {
491 | "severity": "medium",
492 | "description": "Direct dependency on global Redis client in UserService",
493 | },
494 | {"severity": "low", "description": "Global configuration instance could cause testing issues"},
495 | ],
496 | "confidence": "medium",
497 | "continuation_id": continuation_id,
498 | },
499 | )
500 |
501 | if not response2:
502 | self.logger.error("Failed to continue analysis to step 2")
503 | return False
504 |
505 | response2_data = self._parse_analyze_response(response2)
506 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_analysis"):
507 | return False
508 |
509 | # Check analysis status tracking
510 | analysis_status = response2_data.get("analysis_status", {})
511 | if analysis_status.get("files_checked", 0) < 3:
512 | self.logger.error("Files checked count not properly tracked")
513 | return False
514 |
515 | if analysis_status.get("insights_by_severity", {}).get("medium", 0) < 1:
516 | self.logger.error("Medium severity insights not properly tracked")
517 | return False
518 |
519 | if analysis_status.get("analysis_confidence") != "medium":
520 | self.logger.error("Confidence level not properly tracked")
521 | return False
522 |
523 | self.logger.info(" ✅ Step 2 successful with proper tracking")
524 |
525 | # Store continuation_id for next test
526 | self.analysis_continuation_id = continuation_id
527 | return True
528 |
529 | except Exception as e:
530 | self.logger.error(f"Single analysis session test failed: {e}")
531 | return False
532 |
533 | def _test_analysis_refocus_flow(self) -> bool:
534 | """Test analysis flow that requires refocusing to revise findings"""
535 | try:
536 | self.logger.info(" 1.2: Testing analysis refocus workflow")
537 |
538 | # Start a new analysis for testing refocus behaviour
539 | self.logger.info(" 1.2.1: Start analysis for refocus test")
540 | response1, continuation_id = self.call_mcp_tool(
541 | "analyze",
542 | {
543 | "step": "Analyzing performance characteristics of the data processing pipeline",
544 | "step_number": 1,
545 | "total_steps": 4,
546 | "next_step_required": True,
547 | "findings": "Initial analysis suggests database queries might be the bottleneck",
548 | "files_checked": [self.main_service_file],
549 | "relevant_files": [self.main_service_file, self.utils_file],
550 | "prompt": "Analyze performance bottlenecks in this microservice",
551 | "analysis_type": "performance",
552 | },
553 | )
554 |
555 | if not response1 or not continuation_id:
556 | self.logger.error("Failed to start refocus test analysis")
557 | return False
558 |
559 | # Step 2: Wrong direction
560 | self.logger.info(" 1.2.2: Step 2 - Incorrect analysis path")
561 | response2, _ = self.call_mcp_tool(
562 | "analyze",
563 | {
564 | "step": "Focusing on database optimization strategies",
565 | "step_number": 2,
566 | "total_steps": 4,
567 | "next_step_required": True,
568 | "findings": "Database queries seem reasonable, might be looking in wrong direction",
569 | "files_checked": [self.main_service_file, self.models_file],
570 | "relevant_files": [],
571 | "relevant_context": [],
572 | "issues_found": [],
573 | "confidence": "low",
574 | "continuation_id": continuation_id,
575 | },
576 | )
577 |
578 | if not response2:
579 | self.logger.error("Failed to continue to step 2")
580 | return False
581 |
582 | # Step 3: Adjust investigation path
583 | self.logger.info(" 1.2.3: Step 3 - Refocus the analysis")
584 | response3, _ = self.call_mcp_tool(
585 | "analyze",
586 | {
587 | "step": "Refocus - the performance issue might not be database related. Let me examine the caching and serialization patterns instead.",
588 | "step_number": 3,
589 | "total_steps": 4,
590 | "next_step_required": True,
591 | "findings": "Found potential performance issues in JSON serialization and cache key generation patterns in utils module",
592 | "files_checked": [self.utils_file, self.models_file],
593 | "relevant_files": [self.utils_file, self.models_file],
594 | "relevant_context": ["generate_cache_key", "User.to_dict", "sanitize_input"],
595 | "issues_found": [
596 | {"severity": "medium", "description": "JSON serialization in model classes could be optimized"},
597 | {"severity": "low", "description": "Cache key generation lacks proper escaping"},
598 | ],
599 | "confidence": "medium",
600 | "continuation_id": continuation_id,
601 | },
602 | )
603 |
604 | if not response3:
605 | self.logger.error("Failed to refocus analysis")
606 | return False
607 |
608 | response3_data = self._parse_analyze_response(response3)
609 | if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_analysis"):
610 | return False
611 |
612 | self.logger.info(" ✅ Analysis refocus flow working correctly")
613 | return True
614 |
615 | except Exception as e:
616 | self.logger.error(f"Backtracking test failed: {e}")
617 | return False
618 |
619 | def _test_complete_analysis_with_expert(self) -> bool:
620 | """Test complete analysis ending with expert validation"""
621 | try:
622 | self.logger.info(" 1.3: Testing complete analysis with expert validation")
623 |
624 | # Use the continuation from first test
625 | continuation_id = getattr(self, "analysis_continuation_id", None)
626 | if not continuation_id:
627 | # Start fresh if no continuation available
628 | self.logger.info(" 1.3.0: Starting fresh analysis")
629 | response0, continuation_id = self.call_mcp_tool(
630 | "analyze",
631 | {
632 | "step": "Analyzing the microservice architecture for improvement opportunities",
633 | "step_number": 1,
634 | "total_steps": 2,
635 | "next_step_required": True,
636 | "findings": "Found dependency injection and configuration management issues",
637 | "files_checked": [self.main_service_file, self.config_file],
638 | "relevant_files": [self.main_service_file, self.config_file],
639 | "relevant_context": ["UserService", "AppConfig"],
640 | "prompt": "Analyze architectural patterns and improvement opportunities",
641 | "analysis_type": "architecture",
642 | },
643 | )
644 | if not response0 or not continuation_id:
645 | self.logger.error("Failed to start fresh analysis")
646 | return False
647 |
648 | # Final step - trigger expert validation
649 | self.logger.info(" 1.3.1: Final step - complete analysis")
650 | response_final, _ = self.call_mcp_tool(
651 | "analyze",
652 | {
653 | "step": "Analysis complete. I have identified key architectural patterns and strategic improvement opportunities across scalability, maintainability, and performance dimensions.",
654 | "step_number": 2,
655 | "total_steps": 2,
656 | "next_step_required": False, # Final step - triggers expert validation
657 | "findings": "Key findings: 1) Tight coupling via global dependencies, 2) Missing error handling and transaction management, 3) Mixed concerns in model classes, 4) Configuration management could be more flexible, 5) Opportunities for dependency injection and better separation of concerns.",
658 | "files_checked": [self.main_service_file, self.config_file, self.models_file, self.utils_file],
659 | "relevant_files": [self.main_service_file, self.config_file, self.models_file, self.utils_file],
660 | "relevant_context": ["UserService", "AppConfig", "User", "validate_email"],
661 | "issues_found": [
662 | {"severity": "high", "description": "Tight coupling via global Redis client and configuration"},
663 | {"severity": "medium", "description": "Missing transaction management in create_user"},
664 | {"severity": "medium", "description": "Serialization logic mixed with model classes"},
665 | {"severity": "low", "description": "Magic numbers and hardcoded values scattered throughout"},
666 | ],
667 | "confidence": "high",
668 | "continuation_id": continuation_id,
669 | "model": "flash", # Use flash for expert validation
670 | },
671 | )
672 |
673 | if not response_final:
674 | self.logger.error("Failed to complete analysis")
675 | return False
676 |
677 | response_final_data = self._parse_analyze_response(response_final)
678 | if not response_final_data:
679 | return False
680 |
681 | # Validate final response structure - expect calling_expert_analysis for next_step_required=False
682 | if response_final_data.get("status") != "calling_expert_analysis":
683 | self.logger.error(
684 | f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
685 | )
686 | return False
687 |
688 | if not response_final_data.get("analysis_complete"):
689 | self.logger.error("Expected analysis_complete=true for final step")
690 | return False # Check for expert analysis
691 | if "expert_analysis" not in response_final_data:
692 | self.logger.error("Missing expert_analysis in final response")
693 | return False
694 |
695 | expert_analysis = response_final_data.get("expert_analysis", {})
696 |
697 | # Check for expected analysis content (checking common patterns)
698 | analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
699 |
700 | # Look for architectural analysis indicators
701 | arch_indicators = ["architecture", "pattern", "coupling", "dependency", "scalability", "maintainability"]
702 | found_indicators = sum(1 for indicator in arch_indicators if indicator in analysis_text)
703 |
704 | if found_indicators >= 3:
705 | self.logger.info(" ✅ Expert analysis identified architectural patterns correctly")
706 | else:
707 | self.logger.warning(
708 | f" ⚠️ Expert analysis may not have fully analyzed architecture (found {found_indicators}/6 indicators)"
709 | )
710 |
711 | # Check complete analysis summary
712 | if "complete_analysis" not in response_final_data:
713 | self.logger.error("Missing complete_analysis in final response")
714 | return False
715 |
716 | complete_analysis = response_final_data["complete_analysis"]
717 | if not complete_analysis.get("relevant_context"):
718 | self.logger.error("Missing relevant context in complete analysis")
719 | return False
720 |
721 | if "UserService" not in complete_analysis["relevant_context"]:
722 | self.logger.error("Expected context not found in analysis summary")
723 | return False
724 |
725 | self.logger.info(" ✅ Complete analysis with expert validation successful")
726 | return True
727 |
728 | except Exception as e:
729 | self.logger.error(f"Complete analysis test failed: {e}")
730 | return False
731 |
732 | def _test_certain_confidence(self) -> bool:
733 | """Test final step analysis completion (analyze tool doesn't use confidence levels)"""
734 | try:
735 | self.logger.info(" 1.4: Testing final step analysis completion")
736 |
737 | # Test final step - analyze tool doesn't use confidence levels, but we test completion
738 | self.logger.info(" 1.4.1: Final step analysis")
739 | response_final, _ = self.call_mcp_tool(
740 | "analyze",
741 | {
742 | "step": "I have completed a comprehensive analysis of the architectural patterns and improvement opportunities.",
743 | "step_number": 1,
744 | "total_steps": 1,
745 | "next_step_required": False, # Final step - should trigger expert analysis
746 | "findings": "Complete architectural analysis reveals: FastAPI microservice with clear separation needs, dependency injection opportunities, and performance optimization potential. Key patterns identified: service layer, repository-like data access, configuration management, and utility functions.",
747 | "files_checked": [self.main_service_file, self.config_file, self.models_file, self.utils_file],
748 | "relevant_files": [self.main_service_file, self.config_file, self.models_file, self.utils_file],
749 | "relevant_context": ["UserService", "AppConfig", "User", "validate_email"],
750 | "issues_found": [
751 | {"severity": "high", "description": "Global dependencies create tight coupling"},
752 | {"severity": "medium", "description": "Transaction management missing in critical operations"},
753 | ],
754 | "prompt": "Comprehensive architectural analysis",
755 | "analysis_type": "architecture",
756 | "model": "flash",
757 | },
758 | )
759 |
760 | if not response_final:
761 | self.logger.error("Failed to test final step analysis")
762 | return False
763 |
764 | response_final_data = self._parse_analyze_response(response_final)
765 | if not response_final_data:
766 | return False
767 |
768 | # Validate final step response - should trigger expert analysis
769 | expected_status = "calling_expert_analysis"
770 | if response_final_data.get("status") != expected_status:
771 | self.logger.error(f"Expected status '{expected_status}', got '{response_final_data.get('status')}'")
772 | return False
773 |
774 | # Check that expert analysis was performed
775 | expert_analysis = response_final_data.get("expert_analysis", {})
776 | if not expert_analysis:
777 | self.logger.error("Expert analysis should be present for final step")
778 | return False
779 |
780 | # Expert analysis should complete successfully
781 | if expert_analysis.get("status") != "analysis_complete":
782 | self.logger.error(
783 | f"Expert analysis status: {expert_analysis.get('status')} (expected analysis_complete)"
784 | )
785 | return False
786 |
787 | self.logger.info(" ✅ Final step analysis completion working correctly")
788 | return True
789 |
790 | except Exception as e:
791 | self.logger.error(f"Final step analysis test failed: {e}")
792 | return False
793 |
794 | def _test_context_aware_file_embedding(self) -> bool:
795 | """Test context-aware file embedding optimization"""
796 | try:
797 | self.logger.info(" 1.5: Testing context-aware file embedding")
798 |
799 | # Test 1: New conversation, intermediate step - should only reference files
800 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
801 | response1, continuation_id = self.call_mcp_tool(
802 | "analyze",
803 | {
804 | "step": "Starting architectural analysis of microservice components",
805 | "step_number": 1,
806 | "total_steps": 3,
807 | "next_step_required": True, # Intermediate step
808 | "findings": "Initial analysis of service layer and configuration patterns",
809 | "files_checked": [self.main_service_file, self.config_file],
810 | "relevant_files": [self.main_service_file], # This should be referenced, not embedded
811 | "relevant_context": ["UserService"],
812 | "issues_found": [{"severity": "medium", "description": "Direct Redis dependency in service class"}],
813 | "confidence": "low",
814 | "prompt": "Analyze service architecture patterns",
815 | "analysis_type": "architecture",
816 | "model": "flash",
817 | },
818 | )
819 |
820 | if not response1 or not continuation_id:
821 | self.logger.error("Failed to start context-aware file embedding test")
822 | return False
823 |
824 | response1_data = self._parse_analyze_response(response1)
825 | if not response1_data:
826 | return False
827 |
828 | # Check file context - should be reference_only for intermediate step
829 | file_context = response1_data.get("file_context", {})
830 | if file_context.get("type") != "reference_only":
831 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
832 | return False
833 |
834 | if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
835 | self.logger.error("Expected context optimization message for reference_only")
836 | return False
837 |
838 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
839 |
840 | # Test 2: Final step - should embed files for expert validation
841 | self.logger.info(" 1.5.2: Final step (should embed files)")
842 | response2, _ = self.call_mcp_tool(
843 | "analyze",
844 | {
845 | "step": "Analysis complete - identified key architectural patterns and improvement opportunities",
846 | "step_number": 2,
847 | "total_steps": 2,
848 | "next_step_required": False, # Final step - should embed files
849 | "continuation_id": continuation_id,
850 | "findings": "Complete analysis reveals dependency injection opportunities, configuration management improvements, and separation of concerns enhancements",
851 | "files_checked": [self.main_service_file, self.config_file, self.models_file],
852 | "relevant_files": [self.main_service_file, self.config_file], # Should be fully embedded
853 | "relevant_context": ["UserService", "AppConfig"],
854 | "issues_found": [
855 | {"severity": "high", "description": "Global dependencies create architectural coupling"},
856 | {"severity": "medium", "description": "Configuration management lacks flexibility"},
857 | ],
858 | "confidence": "high",
859 | "model": "flash",
860 | },
861 | )
862 |
863 | if not response2:
864 | self.logger.error("Failed to complete to final step")
865 | return False
866 |
867 | response2_data = self._parse_analyze_response(response2)
868 | if not response2_data:
869 | return False
870 |
871 | # Check file context - should be fully_embedded for final step
872 | file_context2 = response2_data.get("file_context", {})
873 | if file_context2.get("type") != "fully_embedded":
874 | self.logger.error(
875 | f"Expected fully_embedded file context for final step, got: {file_context2.get('type')}"
876 | )
877 | return False
878 |
879 | if "Full file content embedded for expert analysis" not in file_context2.get("context_optimization", ""):
880 | self.logger.error("Expected expert analysis optimization message for fully_embedded")
881 | return False
882 |
883 | # Verify expert analysis was called for final step
884 | if response2_data.get("status") != "calling_expert_analysis":
885 | self.logger.error("Final step should trigger expert analysis")
886 | return False
887 |
888 | if "expert_analysis" not in response2_data:
889 | self.logger.error("Expert analysis should be present in final step")
890 | return False
891 |
892 | self.logger.info(" ✅ Context-aware file embedding test completed successfully")
893 | return True
894 |
895 | except Exception as e:
896 | self.logger.error(f"Context-aware file embedding test failed: {e}")
897 | return False
898 |
899 | def _test_analysis_types(self) -> bool:
900 | """Test different analysis types (architecture, performance, security, quality)"""
901 | try:
902 | self.logger.info(" 1.6: Testing different analysis types")
903 |
904 | # Test security analysis
905 | self.logger.info(" 1.6.1: Security analysis")
906 | response_security, _ = self.call_mcp_tool(
907 | "analyze",
908 | {
909 | "step": "Conducting security analysis of authentication and data handling patterns",
910 | "step_number": 1,
911 | "total_steps": 1,
912 | "next_step_required": False,
913 | "findings": "Security analysis reveals: password hashing implementation, input validation patterns, SQL injection prevention via parameterized queries, but missing input sanitization in some areas and weak default secret key handling.",
914 | "files_checked": [self.main_service_file, self.utils_file],
915 | "relevant_files": [self.main_service_file, self.utils_file],
916 | "relevant_context": ["hash_password", "validate_email", "sanitize_input"],
917 | "issues_found": [
918 | {"severity": "critical", "description": "Weak default secret key in production detection"},
919 | {"severity": "medium", "description": "Input sanitization not consistently applied"},
920 | ],
921 | "confidence": "high",
922 | "prompt": "Analyze security patterns and vulnerabilities",
923 | "analysis_type": "security",
924 | "model": "flash",
925 | },
926 | )
927 |
928 | if not response_security:
929 | self.logger.error("Failed security analysis test")
930 | return False
931 |
932 | response_security_data = self._parse_analyze_response(response_security)
933 | if not response_security_data:
934 | return False
935 |
936 | # Check that security analysis was processed
937 | issues = response_security_data.get("complete_analysis", {}).get("issues_found", [])
938 | critical_issues = [issue for issue in issues if issue.get("severity") == "critical"]
939 |
940 | if not critical_issues:
941 | self.logger.warning("Security analysis should have identified critical security issues")
942 | else:
943 | self.logger.info(" ✅ Security analysis identified critical issues")
944 |
945 | # Test quality analysis
946 | self.logger.info(" 1.6.2: Quality analysis")
947 | response_quality, _ = self.call_mcp_tool(
948 | "analyze",
949 | {
950 | "step": "Conducting code quality analysis focusing on maintainability and best practices",
951 | "step_number": 1,
952 | "total_steps": 1,
953 | "next_step_required": False,
954 | "findings": "Code quality analysis shows: good use of type hints, proper error handling in some areas but missing in others, mixed separation of concerns, and opportunities for better abstraction.",
955 | "files_checked": [self.models_file, self.utils_file],
956 | "relevant_files": [self.models_file, self.utils_file],
957 | "relevant_context": ["User.to_dict", "ValidationError", "PerformanceTimer"],
958 | "issues_found": [
959 | {"severity": "medium", "description": "Serialization logic mixed with model classes"},
960 | {"severity": "low", "description": "Inconsistent error handling patterns"},
961 | ],
962 | "confidence": "high",
963 | "prompt": "Analyze code quality and maintainability patterns",
964 | "analysis_type": "quality",
965 | "model": "flash",
966 | },
967 | )
968 |
969 | if not response_quality:
970 | self.logger.error("Failed quality analysis test")
971 | return False
972 |
973 | response_quality_data = self._parse_analyze_response(response_quality)
974 | if not response_quality_data:
975 | return False
976 |
977 | # Verify quality analysis was processed
978 | quality_context = response_quality_data.get("complete_analysis", {}).get("relevant_context", [])
979 | if not any("User" in ctx for ctx in quality_context):
980 | self.logger.warning("Quality analysis should have analyzed model classes")
981 | else:
982 | self.logger.info(" ✅ Quality analysis examined relevant code elements")
983 |
984 | self.logger.info(" ✅ Different analysis types test completed successfully")
985 | return True
986 |
987 | except Exception as e:
988 | self.logger.error(f"Analysis types test failed: {e}")
989 | return False
990 |
991 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
992 | """Call an MCP tool in-process - override for analyze-specific response handling"""
993 | # Use in-process implementation to maintain conversation memory
994 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
995 |
996 | if not response_text:
997 | return None, None
998 |
999 | # Extract continuation_id from analyze response specifically
1000 | continuation_id = self._extract_analyze_continuation_id(response_text)
1001 |
1002 | return response_text, continuation_id
1003 |
1004 | def _extract_analyze_continuation_id(self, response_text: str) -> Optional[str]:
1005 | """Extract continuation_id from analyze response"""
1006 | try:
1007 | # Parse the response
1008 | response_data = json.loads(response_text)
1009 | return response_data.get("continuation_id")
1010 |
1011 | except json.JSONDecodeError as e:
1012 | self.logger.debug(f"Failed to parse response for analyze continuation_id: {e}")
1013 | return None
1014 |
1015 | def _parse_analyze_response(self, response_text: str) -> dict:
1016 | """Parse analyze tool JSON response"""
1017 | try:
1018 | # Parse the response - it should be direct JSON
1019 | return json.loads(response_text)
1020 |
1021 | except json.JSONDecodeError as e:
1022 | self.logger.error(f"Failed to parse analyze response as JSON: {e}")
1023 | self.logger.error(f"Response text: {response_text[:500]}...")
1024 | return {}
1025 |
1026 | def _validate_step_response(
1027 | self,
1028 | response_data: dict,
1029 | expected_step: int,
1030 | expected_total: int,
1031 | expected_next_required: bool,
1032 | expected_status: str,
1033 | ) -> bool:
1034 | """Validate an analyze investigation step response structure"""
1035 | try:
1036 | # Check status
1037 | if response_data.get("status") != expected_status:
1038 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
1039 | return False
1040 |
1041 | # Check step number
1042 | if response_data.get("step_number") != expected_step:
1043 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
1044 | return False
1045 |
1046 | # Check total steps
1047 | if response_data.get("total_steps") != expected_total:
1048 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
1049 | return False
1050 |
1051 | # Check next_step_required
1052 | if response_data.get("next_step_required") != expected_next_required:
1053 | self.logger.error(
1054 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
1055 | )
1056 | return False
1057 |
1058 | # Check analysis_status exists
1059 | if "analysis_status" not in response_data:
1060 | self.logger.error("Missing analysis_status in response")
1061 | return False
1062 |
1063 | # Check next_steps guidance
1064 | if not response_data.get("next_steps"):
1065 | self.logger.error("Missing next_steps guidance in response")
1066 | return False
1067 |
1068 | return True
1069 |
1070 | except Exception as e:
1071 | self.logger.error(f"Error validating step response: {e}")
1072 | return False
1073 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_refactor_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Refactor Tool Validation Test
4 |
5 | Tests the refactor tool's capabilities using the new workflow architecture.
6 | This validates the step-by-step refactoring analysis pattern with expert validation.
7 | """
8 |
9 | import json
10 | from typing import Optional
11 |
12 | from .conversation_base_test import ConversationBaseTest
13 |
14 |
15 | class RefactorValidationTest(ConversationBaseTest):
16 | """Test refactor tool with new workflow architecture"""
17 |
18 | @property
19 | def test_name(self) -> str:
20 | return "refactor_validation"
21 |
22 | @property
23 | def test_description(self) -> str:
24 | return "Refactor tool validation with new workflow architecture"
25 |
26 | def run_test(self) -> bool:
27 | """Test refactor tool capabilities"""
28 | # Set up the test environment
29 | self.setUp()
30 |
31 | try:
32 | self.logger.info("Test: Refactor tool validation (new architecture)")
33 |
34 | # Create test files with refactoring opportunities
35 | self._create_refactoring_test_code()
36 |
37 | # Test 1: Single refactoring analysis session with multiple steps
38 | if not self._test_single_refactoring_session():
39 | return False
40 |
41 | # Test 2: Refactoring analysis requiring refocus
42 | if not self._test_refactoring_refocus_flow():
43 | return False
44 |
45 | # Test 3: Complete refactoring analysis with expert analysis
46 | if not self._test_complete_refactoring_with_analysis():
47 | return False
48 |
49 | # Test 4: Certain confidence with complete refactor_result_confidence
50 | if not self._test_certain_confidence_complete_refactoring():
51 | return False
52 |
53 | # Test 5: Context-aware file embedding for refactoring
54 | if not self._test_context_aware_refactoring_file_embedding():
55 | return False
56 |
57 | # Test 6: Different refactor types
58 | if not self._test_different_refactor_types():
59 | return False
60 |
61 | self.logger.info(" ✅ All refactor validation tests passed")
62 | return True
63 |
64 | except Exception as e:
65 | self.logger.error(f"Refactor validation test failed: {e}")
66 | return False
67 |
68 | def _create_refactoring_test_code(self):
69 | """Create test files with various refactoring opportunities"""
70 | # Create a Python file with obvious code smells and decomposition opportunities
71 | refactor_code = """#!/usr/bin/env python3
72 | import json
73 | import os
74 | from datetime import datetime
75 |
76 | # Code smell: Large class with multiple responsibilities
77 | class DataProcessorManager:
78 | def __init__(self, config_file):
79 | self.config = self._load_config(config_file)
80 | self.processed_count = 0
81 | self.error_count = 0
82 | self.log_file = "processing.log"
83 |
84 | def _load_config(self, config_file):
85 | \"\"\"Load configuration from file\"\"\"
86 | with open(config_file, 'r') as f:
87 | return json.load(f)
88 |
89 | # Code smell: Long method doing too many things (decompose opportunity)
90 | def process_user_data(self, user_data, validation_rules, output_format):
91 | \"\"\"Process user data with validation and formatting\"\"\"
92 | # Validation logic
93 | if not user_data:
94 | print("Error: No user data") # Code smell: print instead of logging
95 | return None
96 |
97 | if not isinstance(user_data, dict):
98 | print("Error: Invalid data format")
99 | return None
100 |
101 | # Check required fields
102 | required_fields = ['name', 'email', 'age']
103 | for field in required_fields:
104 | if field not in user_data:
105 | print(f"Error: Missing field {field}")
106 | return None
107 |
108 | # Apply validation rules
109 | for rule in validation_rules:
110 | if rule['field'] == 'email':
111 | if '@' not in user_data['email']: # Code smell: simple validation
112 | print("Error: Invalid email")
113 | return None
114 | elif rule['field'] == 'age':
115 | if user_data['age'] < 18: # Code smell: magic number
116 | print("Error: Age too young")
117 | return None
118 |
119 | # Data processing
120 | processed_data = {}
121 | processed_data['full_name'] = user_data['name'].title()
122 | processed_data['email_domain'] = user_data['email'].split('@')[1]
123 | processed_data['age_category'] = 'adult' if user_data['age'] >= 18 else 'minor'
124 |
125 | # Code smell: Duplicate date formatting logic
126 | if output_format == 'json':
127 | processed_data['processed_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
128 | result = json.dumps(processed_data, ensure_ascii=False)
129 | elif output_format == 'csv':
130 | processed_data['processed_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
131 | result = f"{processed_data['full_name']},{processed_data['email_domain']},{processed_data['age_category']}"
132 | else:
133 | processed_data['processed_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
134 | result = str(processed_data)
135 |
136 | # Logging and statistics
137 | self.processed_count += 1
138 | with open(self.log_file, 'a') as f: # Code smell: file handling without context
139 | f.write(f"Processed: {user_data['name']} at {datetime.now()}\\n")
140 |
141 | return result
142 |
143 | # Code smell: Another long method (decompose opportunity)
144 | def batch_process_files(self, file_list, output_dir):
145 | \"\"\"Process multiple files in batch\"\"\"
146 | results = []
147 |
148 | for file_path in file_list:
149 | # File validation
150 | if not os.path.exists(file_path):
151 | print(f"Error: File {file_path} not found")
152 | continue
153 |
154 | if not file_path.endswith('.json'):
155 | print(f"Error: File {file_path} is not JSON")
156 | continue
157 |
158 | # Read and process file
159 | try:
160 | with open(file_path, 'r') as f:
161 | data = json.load(f)
162 |
163 | # Code smell: Nested loops and complex logic
164 | for user_id, user_data in data.items():
165 | if isinstance(user_data, dict):
166 | # Duplicate validation logic from process_user_data
167 | if 'name' in user_data and 'email' in user_data:
168 | if '@' in user_data['email']:
169 | # More processing...
170 | processed = {
171 | 'id': user_id,
172 | 'name': user_data['name'].title(),
173 | 'email': user_data['email'].lower()
174 | }
175 | results.append(processed)
176 |
177 | # Write output file
178 | output_file = os.path.join(output_dir, f"processed_{os.path.basename(file_path)}")
179 | with open(output_file, 'w') as f:
180 | json.dump(results, f, indent=2)
181 |
182 | except Exception as e:
183 | print(f"Error processing file {file_path}: {e}")
184 | self.error_count += 1
185 |
186 | return results
187 |
188 | # Code smell: Method doing file I/O and business logic
189 | def generate_report(self):
190 | \"\"\"Generate processing report\"\"\"
191 | report_data = {
192 | 'total_processed': self.processed_count,
193 | 'total_errors': self.error_count,
194 | 'success_rate': (self.processed_count / (self.processed_count + self.error_count)) * 100 if (self.processed_count + self.error_count) > 0 else 0,
195 | 'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
196 | }
197 |
198 | # Write to multiple formats (code smell: duplicate logic)
199 | with open('report.json', 'w') as f:
200 | json.dump(report_data, f, indent=2)
201 |
202 | with open('report.txt', 'w') as f:
203 | f.write(f"Processing Report\\n")
204 | f.write(f"================\\n")
205 | f.write(f"Total Processed: {report_data['total_processed']}\\n")
206 | f.write(f"Total Errors: {report_data['total_errors']}\\n")
207 | f.write(f"Success Rate: {report_data['success_rate']:.2f}%\\n")
208 | f.write(f"Generated: {report_data['generated_at']}\\n")
209 |
210 | return report_data
211 |
212 | # Code smell: Utility functions that could be in a separate module
213 | def validate_email(email):
214 | \"\"\"Simple email validation\"\"\"
215 | return '@' in email and '.' in email
216 |
217 | def format_name(name):
218 | \"\"\"Format name to title case\"\"\"
219 | return name.title() if name else ""
220 |
221 | def calculate_age_category(age):
222 | \"\"\"Calculate age category\"\"\"
223 | if age < 18:
224 | return 'minor'
225 | elif age < 65:
226 | return 'adult'
227 | else:
228 | return 'senior'
229 | """
230 |
231 | # Create test file with refactoring opportunities
232 | self.refactor_file = self.create_additional_test_file("data_processor_manager.py", refactor_code)
233 | self.logger.info(f" ✅ Created test file with refactoring opportunities: {self.refactor_file}")
234 |
235 | # Create a smaller file for focused testing
236 | small_refactor_code = """#!/usr/bin/env python3
237 |
238 | # Code smell: God function
239 | def process_everything(data, config, logger):
240 | \"\"\"Function that does too many things\"\"\"
241 | # Validation
242 | if not data:
243 | print("No data") # Should use logger
244 | return None
245 |
246 | # Processing
247 | result = []
248 | for item in data:
249 | if item > 5: # Magic number
250 | result.append(item * 2) # Magic number
251 |
252 | # Logging
253 | print(f"Processed {len(result)} items")
254 |
255 | # File I/O
256 | with open("output.txt", "w") as f:
257 | f.write(str(result))
258 |
259 | return result
260 |
261 | # Modernization opportunity: Could use dataclass
262 | class UserData:
263 | def __init__(self, name, email, age):
264 | self.name = name
265 | self.email = email
266 | self.age = age
267 |
268 | def to_dict(self):
269 | return {
270 | 'name': self.name,
271 | 'email': self.email,
272 | 'age': self.age
273 | }
274 | """
275 |
276 | self.small_refactor_file = self.create_additional_test_file("simple_processor.py", small_refactor_code)
277 | self.logger.info(f" ✅ Created small test file: {self.small_refactor_file}")
278 |
279 | def _test_single_refactoring_session(self) -> bool:
280 | """Test a complete refactoring analysis session with multiple steps"""
281 | try:
282 | self.logger.info(" 1.1: Testing single refactoring analysis session")
283 |
284 | # Step 1: Start refactoring analysis
285 | self.logger.info(" 1.1.1: Step 1 - Initial refactoring investigation")
286 | response1, continuation_id = self.call_mcp_tool(
287 | "refactor",
288 | {
289 | "step": "Starting refactoring analysis of the data processor code. Let me examine the code structure and identify opportunities for decomposition, code smell fixes, and modernization.",
290 | "step_number": 1,
291 | "total_steps": 4,
292 | "next_step_required": True,
293 | "findings": "Initial scan shows a large DataProcessorManager class with multiple responsibilities. The class handles configuration, data processing, file I/O, and logging - violating single responsibility principle.",
294 | "files_checked": [self.refactor_file],
295 | "relevant_files": [self.refactor_file],
296 | "confidence": "incomplete",
297 | "refactor_type": "codesmells",
298 | "focus_areas": ["maintainability", "readability"],
299 | },
300 | )
301 |
302 | if not response1 or not continuation_id:
303 | self.logger.error("Failed to get initial refactoring response")
304 | return False
305 |
306 | # Parse and validate JSON response
307 | response1_data = self._parse_refactor_response(response1)
308 | if not response1_data:
309 | return False
310 |
311 | # Validate step 1 response structure - expect pause_for_refactoring_analysis for next_step_required=True
312 | if not self._validate_refactoring_step_response(
313 | response1_data, 1, 4, True, "pause_for_refactoring_analysis"
314 | ):
315 | return False
316 |
317 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
318 |
319 | # Step 2: Deeper analysis
320 | self.logger.info(" 1.1.2: Step 2 - Detailed code analysis")
321 | response2, _ = self.call_mcp_tool(
322 | "refactor",
323 | {
324 | "step": "Now examining the specific methods and identifying concrete refactoring opportunities. Found multiple code smells and decomposition needs.",
325 | "step_number": 2,
326 | "total_steps": 4,
327 | "next_step_required": True,
328 | "findings": "Identified several major issues: 1) process_user_data method is 50+ lines doing validation, processing, and I/O. 2) Duplicate validation logic. 3) Magic numbers (18 for age). 4) print statements instead of proper logging. 5) File handling without proper context management.",
329 | "files_checked": [self.refactor_file],
330 | "relevant_files": [self.refactor_file],
331 | "relevant_context": [
332 | "DataProcessorManager.process_user_data",
333 | "DataProcessorManager.batch_process_files",
334 | ],
335 | "issues_found": [
336 | {
337 | "type": "codesmells",
338 | "severity": "high",
339 | "description": "Long method: process_user_data does too many things",
340 | },
341 | {
342 | "type": "codesmells",
343 | "severity": "medium",
344 | "description": "Magic numbers: age validation uses hardcoded 18",
345 | },
346 | {
347 | "type": "codesmells",
348 | "severity": "medium",
349 | "description": "Duplicate validation logic in multiple places",
350 | },
351 | ],
352 | "confidence": "partial",
353 | "continuation_id": continuation_id,
354 | },
355 | )
356 |
357 | if not response2:
358 | self.logger.error("Failed to continue refactoring analysis to step 2")
359 | return False
360 |
361 | response2_data = self._parse_refactor_response(response2)
362 | if not self._validate_refactoring_step_response(
363 | response2_data, 2, 4, True, "pause_for_refactoring_analysis"
364 | ):
365 | return False
366 |
367 | # Check refactoring status tracking
368 | refactoring_status = response2_data.get("refactoring_status", {})
369 | if refactoring_status.get("files_checked", 0) < 1:
370 | self.logger.error("Files checked count not properly tracked")
371 | return False
372 |
373 | opportunities_by_type = refactoring_status.get("opportunities_by_type", {})
374 | if "codesmells" not in opportunities_by_type:
375 | self.logger.error("Code smells not properly tracked in opportunities")
376 | return False
377 |
378 | if refactoring_status.get("refactor_confidence") != "partial":
379 | self.logger.error("Refactor confidence not properly tracked")
380 | return False
381 |
382 | self.logger.info(" ✅ Step 2 successful with proper refactoring tracking")
383 |
384 | # Store continuation_id for next test
385 | self.refactoring_continuation_id = continuation_id
386 | return True
387 |
388 | except Exception as e:
389 | self.logger.error(f"Single refactoring session test failed: {e}")
390 | return False
391 |
392 | def _test_refactoring_refocus_flow(self) -> bool:
393 | """Test refactoring analysis that shifts focus mid-investigation"""
394 | try:
395 | self.logger.info(" 1.2: Testing refactoring analysis refocus workflow")
396 |
397 | # Start a new refactoring analysis for testing refocus behaviour
398 | self.logger.info(" 1.2.1: Start refactoring analysis for refocus test")
399 | response1, continuation_id = self.call_mcp_tool(
400 | "refactor",
401 | {
402 | "step": "Analyzing code for decomposition opportunities",
403 | "step_number": 1,
404 | "total_steps": 4,
405 | "next_step_required": True,
406 | "findings": "Initial focus on class-level decomposition",
407 | "files_checked": [self.small_refactor_file],
408 | "relevant_files": [self.small_refactor_file],
409 | "confidence": "incomplete",
410 | "refactor_type": "decompose",
411 | },
412 | )
413 |
414 | if not response1 or not continuation_id:
415 | self.logger.error("Failed to start refocus test refactoring analysis")
416 | return False
417 |
418 | # Step 2: Wrong direction
419 | self.logger.info(" 1.2.2: Step 2 - Wrong refactoring focus")
420 | response2, _ = self.call_mcp_tool(
421 | "refactor",
422 | {
423 | "step": "Focusing on class decomposition strategies",
424 | "step_number": 2,
425 | "total_steps": 4,
426 | "next_step_required": True,
427 | "findings": "Class structure seems reasonable, might be looking in wrong direction",
428 | "files_checked": [self.small_refactor_file],
429 | "relevant_files": [],
430 | "confidence": "incomplete",
431 | "continuation_id": continuation_id,
432 | },
433 | )
434 |
435 | if not response2:
436 | self.logger.error("Failed to continue to step 2")
437 | return False
438 |
439 | # Step 3: Backtrack from step 2
440 | self.logger.info(" 1.2.3: Step 3 - Refocus on function decomposition")
441 | response3, _ = self.call_mcp_tool(
442 | "refactor",
443 | {
444 | "step": "Refocusing - the real decomposition opportunity is the god function process_everything. Let me analyze function-level refactoring instead.",
445 | "step_number": 3,
446 | "total_steps": 4,
447 | "next_step_required": True,
448 | "findings": "Found the main decomposition opportunity: process_everything function does validation, processing, logging, and file I/O. Should be split into separate functions with single responsibilities.",
449 | "files_checked": [self.small_refactor_file],
450 | "relevant_files": [self.small_refactor_file],
451 | "relevant_context": ["process_everything"],
452 | "issues_found": [
453 | {
454 | "type": "decompose",
455 | "severity": "high",
456 | "description": "God function: process_everything has multiple responsibilities",
457 | },
458 | {
459 | "type": "codesmells",
460 | "severity": "medium",
461 | "description": "Magic numbers in processing logic",
462 | },
463 | ],
464 | "confidence": "partial",
465 | "continuation_id": continuation_id,
466 | },
467 | )
468 |
469 | if not response3:
470 | self.logger.error("Failed to refocus")
471 | return False
472 |
473 | response3_data = self._parse_refactor_response(response3)
474 | if not self._validate_refactoring_step_response(
475 | response3_data, 3, 4, True, "pause_for_refactoring_analysis"
476 | ):
477 | return False
478 |
479 | self.logger.info(" ✅ Refocus working correctly for refactoring analysis")
480 | return True
481 |
482 | except Exception as e:
483 | self.logger.error(f"Refocusing test failed: {e}")
484 | return False
485 |
486 | def _test_complete_refactoring_with_analysis(self) -> bool:
487 | """Test complete refactoring analysis ending with expert analysis"""
488 | try:
489 | self.logger.info(" 1.3: Testing complete refactoring analysis with expert analysis")
490 |
491 | # Use the continuation from first test
492 | continuation_id = getattr(self, "refactoring_continuation_id", None)
493 | if not continuation_id:
494 | # Start fresh if no continuation available
495 | self.logger.info(" 1.3.0: Starting fresh refactoring analysis")
496 | response0, continuation_id = self.call_mcp_tool(
497 | "refactor",
498 | {
499 | "step": "Analyzing the data processor for comprehensive refactoring opportunities",
500 | "step_number": 1,
501 | "total_steps": 2,
502 | "next_step_required": True,
503 | "findings": "Found multiple refactoring opportunities in DataProcessorManager",
504 | "files_checked": [self.refactor_file],
505 | "relevant_files": [self.refactor_file],
506 | "relevant_context": ["DataProcessorManager.process_user_data"],
507 | "confidence": "partial",
508 | "refactor_type": "codesmells",
509 | },
510 | )
511 | if not response0 or not continuation_id:
512 | self.logger.error("Failed to start fresh refactoring analysis")
513 | return False
514 |
515 | # Final step - trigger expert analysis
516 | self.logger.info(" 1.3.1: Final step - complete refactoring analysis")
517 | response_final, _ = self.call_mcp_tool(
518 | "refactor",
519 | {
520 | "step": "Refactoring analysis complete. Identified comprehensive opportunities for code smell fixes, decomposition, and modernization across the DataProcessorManager class.",
521 | "step_number": 2,
522 | "total_steps": 2,
523 | "next_step_required": False, # Final step - triggers expert analysis
524 | "findings": "Complete analysis shows: 1) Large class violating SRP, 2) Long methods needing decomposition, 3) Duplicate validation logic, 4) Magic numbers, 5) Poor error handling with print statements, 6) File I/O mixed with business logic. All major refactoring opportunities identified with specific line locations.",
525 | "files_checked": [self.refactor_file],
526 | "relevant_files": [self.refactor_file],
527 | "relevant_context": [
528 | "DataProcessorManager.process_user_data",
529 | "DataProcessorManager.batch_process_files",
530 | "DataProcessorManager.generate_report",
531 | ],
532 | "issues_found": [
533 | {
534 | "type": "decompose",
535 | "severity": "critical",
536 | "description": "Large class with multiple responsibilities",
537 | },
538 | {
539 | "type": "codesmells",
540 | "severity": "high",
541 | "description": "Long method: process_user_data (50+ lines)",
542 | },
543 | {"type": "codesmells", "severity": "high", "description": "Duplicate validation logic"},
544 | {"type": "codesmells", "severity": "medium", "description": "Magic numbers in age validation"},
545 | {
546 | "type": "modernize",
547 | "severity": "medium",
548 | "description": "Use proper logging instead of print statements",
549 | },
550 | ],
551 | "confidence": "partial", # Use partial to trigger expert analysis
552 | "continuation_id": continuation_id,
553 | "model": "flash", # Use flash for expert analysis
554 | },
555 | )
556 |
557 | if not response_final:
558 | self.logger.error("Failed to complete refactoring analysis")
559 | return False
560 |
561 | response_final_data = self._parse_refactor_response(response_final)
562 | if not response_final_data:
563 | return False
564 |
565 | # Validate final response structure - expect calling_expert_analysis or files_required_to_continue
566 | expected_statuses = ["calling_expert_analysis", "files_required_to_continue"]
567 | actual_status = response_final_data.get("status")
568 | if actual_status not in expected_statuses:
569 | self.logger.error(f"Expected status to be one of {expected_statuses}, got '{actual_status}'")
570 | return False
571 |
572 | if not response_final_data.get("refactoring_complete"):
573 | self.logger.error("Expected refactoring_complete=true for final step")
574 | return False
575 |
576 | # Check for expert analysis or content (depending on status)
577 | if actual_status == "calling_expert_analysis":
578 | if "expert_analysis" not in response_final_data:
579 | self.logger.error("Missing expert_analysis in final response")
580 | return False
581 | expert_analysis = response_final_data.get("expert_analysis", {})
582 | analysis_content = json.dumps(expert_analysis, ensure_ascii=False).lower()
583 | elif actual_status == "files_required_to_continue":
584 | # For files_required_to_continue, analysis is in content field
585 | if "content" not in response_final_data:
586 | self.logger.error("Missing content in files_required_to_continue response")
587 | return False
588 | expert_analysis = {"content": response_final_data.get("content", "")}
589 | analysis_content = response_final_data.get("content", "").lower()
590 | else:
591 | self.logger.error(f"Unexpected status: {actual_status}")
592 | return False
593 |
594 | # Check for expected analysis content (checking common patterns)
595 | analysis_text = analysis_content
596 |
597 | # Look for refactoring identification
598 | refactor_indicators = ["refactor", "decompose", "code smell", "method", "class", "responsibility"]
599 | found_indicators = sum(1 for indicator in refactor_indicators if indicator in analysis_text)
600 |
601 | if found_indicators >= 3:
602 | self.logger.info(" ✅ Expert analysis identified refactoring opportunities correctly")
603 | else:
604 | self.logger.warning(
605 | f" ⚠️ Expert analysis may not have fully identified refactoring opportunities (found {found_indicators}/6 indicators)"
606 | )
607 |
608 | # Check complete refactoring summary
609 | if "complete_refactoring" not in response_final_data:
610 | self.logger.error("Missing complete_refactoring in final response")
611 | return False
612 |
613 | complete_refactoring = response_final_data["complete_refactoring"]
614 | if not complete_refactoring.get("relevant_context"):
615 | self.logger.error("Missing relevant context in complete refactoring")
616 | return False
617 |
618 | if "DataProcessorManager.process_user_data" not in complete_refactoring["relevant_context"]:
619 | self.logger.error("Expected method not found in refactoring summary")
620 | return False
621 |
622 | self.logger.info(" ✅ Complete refactoring analysis with expert analysis successful")
623 | return True
624 |
625 | except Exception as e:
626 | self.logger.error(f"Complete refactoring analysis test failed: {e}")
627 | return False
628 |
629 | def _test_certain_confidence_complete_refactoring(self) -> bool:
630 | """Test complete confidence - should skip expert analysis"""
631 | try:
632 | self.logger.info(" 1.4: Testing complete confidence behavior")
633 |
634 | # Test complete confidence - should skip expert analysis
635 | self.logger.info(" 1.4.1: Complete confidence refactoring")
636 | response_certain, _ = self.call_mcp_tool(
637 | "refactor",
638 | {
639 | "step": "I have completed comprehensive refactoring analysis with 100% certainty: identified all major opportunities including decomposition, code smells, and modernization.",
640 | "step_number": 1,
641 | "total_steps": 1,
642 | "next_step_required": False, # Final step
643 | "findings": "Complete refactoring analysis: 1) DataProcessorManager class needs decomposition into separate responsibilities, 2) process_user_data method needs breaking into validation, processing, and formatting functions, 3) Replace print statements with proper logging, 4) Extract magic numbers to constants, 5) Use dataclasses for modern Python patterns.",
644 | "files_checked": [self.small_refactor_file],
645 | "relevant_files": [self.small_refactor_file],
646 | "relevant_context": ["process_everything", "UserData"],
647 | "issues_found": [
648 | {"type": "decompose", "severity": "high", "description": "God function needs decomposition"},
649 | {"type": "modernize", "severity": "medium", "description": "Use dataclass for UserData"},
650 | {"type": "codesmells", "severity": "medium", "description": "Replace print with logging"},
651 | ],
652 | "confidence": "complete", # Complete confidence should skip expert analysis
653 | "refactor_type": "codesmells",
654 | "model": "flash",
655 | },
656 | )
657 |
658 | if not response_certain:
659 | self.logger.error("Failed to test certain confidence with complete refactoring")
660 | return False
661 |
662 | response_certain_data = self._parse_refactor_response(response_certain)
663 | if not response_certain_data:
664 | return False
665 |
666 | # Validate certain confidence response - should skip expert analysis
667 | if response_certain_data.get("status") != "refactoring_analysis_complete_ready_for_implementation":
668 | self.logger.error(
669 | f"Expected status 'refactoring_analysis_complete_ready_for_implementation', got '{response_certain_data.get('status')}'"
670 | )
671 | return False
672 |
673 | if not response_certain_data.get("skip_expert_analysis"):
674 | self.logger.error("Expected skip_expert_analysis=true for complete confidence")
675 | return False
676 |
677 | expert_analysis = response_certain_data.get("expert_analysis", {})
678 | if expert_analysis.get("status") != "skipped_due_to_complete_refactoring_confidence":
679 | self.logger.error("Expert analysis should be skipped for complete confidence")
680 | return False
681 |
682 | self.logger.info(" ✅ Complete confidence behavior working correctly")
683 | return True
684 |
685 | except Exception as e:
686 | self.logger.error(f"Complete confidence test failed: {e}")
687 | return False
688 |
689 | def _test_context_aware_refactoring_file_embedding(self) -> bool:
690 | """Test context-aware file embedding optimization for refactoring workflow"""
691 | try:
692 | self.logger.info(" 1.5: Testing context-aware file embedding for refactoring")
693 |
694 | # Create multiple test files for context testing
695 | utils_content = """#!/usr/bin/env python3
696 | # Utility functions with refactoring opportunities
697 |
698 | def calculate_total(items):
699 | \"\"\"Calculate total with magic numbers\"\"\"
700 | total = 0
701 | for item in items:
702 | if item > 10: # Magic number
703 | total += item * 1.1 # Magic number for tax
704 | return total
705 |
706 | def format_output(data, format_type):
707 | \"\"\"Format output - duplicate logic\"\"\"
708 | if format_type == 'json':
709 | import json
710 | return json.dumps(data, ensure_ascii=False)
711 | elif format_type == 'csv':
712 | return ','.join(str(v) for v in data.values())
713 | else:
714 | return str(data)
715 | """
716 |
717 | helpers_content = """#!/usr/bin/env python3
718 | # Helper functions that could be modernized
719 |
720 | class DataContainer:
721 | \"\"\"Simple data container - could use dataclass\"\"\"
722 | def __init__(self, name, value, category):
723 | self.name = name
724 | self.value = value
725 | self.category = category
726 |
727 | def to_dict(self):
728 | return {
729 | 'name': self.name,
730 | 'value': self.value,
731 | 'category': self.category
732 | }
733 | """
734 |
735 | # Create test files
736 | utils_file = self.create_additional_test_file("utils.py", utils_content)
737 | helpers_file = self.create_additional_test_file("helpers.py", helpers_content)
738 |
739 | # Test 1: New conversation, intermediate step - should only reference files
740 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
741 | response1, continuation_id = self.call_mcp_tool(
742 | "refactor",
743 | {
744 | "step": "Starting refactoring analysis of utility modules",
745 | "step_number": 1,
746 | "total_steps": 3,
747 | "next_step_required": True, # Intermediate step
748 | "findings": "Initial analysis of utility and helper modules for refactoring opportunities",
749 | "files_checked": [utils_file, helpers_file],
750 | "relevant_files": [utils_file], # This should be referenced, not embedded
751 | "relevant_context": ["calculate_total"],
752 | "confidence": "incomplete",
753 | "refactor_type": "codesmells",
754 | "model": "flash",
755 | },
756 | )
757 |
758 | if not response1 or not continuation_id:
759 | self.logger.error("Failed to start context-aware file embedding test")
760 | return False
761 |
762 | response1_data = self._parse_refactor_response(response1)
763 | if not response1_data:
764 | return False
765 |
766 | # Check file context - should be reference_only for intermediate step
767 | file_context = response1_data.get("file_context", {})
768 | if file_context.get("type") != "reference_only":
769 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
770 | return False
771 |
772 | if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
773 | self.logger.error("Expected context optimization message for reference_only")
774 | return False
775 |
776 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
777 |
778 | # Test 2: Final step - should embed files for expert analysis
779 | self.logger.info(" 1.5.2: Final step (should embed files)")
780 | response2, _ = self.call_mcp_tool(
781 | "refactor",
782 | {
783 | "step": "Refactoring analysis complete - identified all opportunities",
784 | "step_number": 3,
785 | "total_steps": 3,
786 | "next_step_required": False, # Final step - should embed files
787 | "continuation_id": continuation_id,
788 | "findings": "Complete analysis: Found magic numbers in calculate_total, duplicate formatting logic, and modernization opportunity with DataContainer class that could use dataclass.",
789 | "files_checked": [utils_file, helpers_file],
790 | "relevant_files": [utils_file, helpers_file], # Should be fully embedded
791 | "relevant_context": ["calculate_total", "format_output", "DataContainer"],
792 | "issues_found": [
793 | {"type": "codesmells", "severity": "medium", "description": "Magic numbers in calculate_total"},
794 | {"type": "modernize", "severity": "low", "description": "DataContainer could use dataclass"},
795 | {"type": "codesmells", "severity": "low", "description": "Duplicate formatting logic"},
796 | ],
797 | "confidence": "partial", # Use partial to trigger expert analysis
798 | "model": "flash",
799 | },
800 | )
801 |
802 | if not response2:
803 | self.logger.error("Failed to complete to final step")
804 | return False
805 |
806 | response2_data = self._parse_refactor_response(response2)
807 | if not response2_data:
808 | return False
809 |
810 | # Check file context - should be fully_embedded for final step
811 | file_context2 = response2_data.get("file_context", {})
812 | if file_context2.get("type") != "fully_embedded":
813 | self.logger.error(
814 | f"Expected fully_embedded file context for final step, got: {file_context2.get('type')}"
815 | )
816 | return False
817 |
818 | if "Full file content embedded for expert analysis" not in file_context2.get("context_optimization", ""):
819 | self.logger.error("Expected expert analysis optimization message for fully_embedded")
820 | return False
821 |
822 | self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
823 |
824 | # Verify expert analysis was called for final step (or files_required_to_continue)
825 | expected_statuses = ["calling_expert_analysis", "files_required_to_continue"]
826 | actual_status = response2_data.get("status")
827 | if actual_status not in expected_statuses:
828 | self.logger.error(f"Expected one of {expected_statuses}, got: {actual_status}")
829 | return False
830 |
831 | # Handle expert analysis based on status
832 | if actual_status == "calling_expert_analysis" and "expert_analysis" not in response2_data:
833 | self.logger.error("Expert analysis should be present in final step with calling_expert_analysis")
834 | return False
835 |
836 | self.logger.info(" ✅ Context-aware file embedding test for refactoring completed successfully")
837 | return True
838 |
839 | except Exception as e:
840 | self.logger.error(f"Context-aware refactoring file embedding test failed: {e}")
841 | return False
842 |
843 | def _test_different_refactor_types(self) -> bool:
844 | """Test different refactor types (decompose, modernize, organization)"""
845 | try:
846 | self.logger.info(" 1.6: Testing different refactor types")
847 |
848 | # Test decompose type
849 | self.logger.info(" 1.6.1: Testing decompose refactor type")
850 | response_decompose, _ = self.call_mcp_tool(
851 | "refactor",
852 | {
853 | "step": "Analyzing code for decomposition opportunities in large functions and classes",
854 | "step_number": 1,
855 | "total_steps": 1,
856 | "next_step_required": False,
857 | "findings": "Found large DataProcessorManager class that violates single responsibility principle and long process_user_data method that needs decomposition.",
858 | "files_checked": [self.refactor_file],
859 | "relevant_files": [self.refactor_file],
860 | "relevant_context": ["DataProcessorManager", "DataProcessorManager.process_user_data"],
861 | "issues_found": [
862 | {
863 | "type": "decompose",
864 | "severity": "critical",
865 | "description": "Large class with multiple responsibilities",
866 | },
867 | {
868 | "type": "decompose",
869 | "severity": "high",
870 | "description": "Long method doing validation, processing, and I/O",
871 | },
872 | ],
873 | "confidence": "complete",
874 | "refactor_type": "decompose",
875 | "model": "flash",
876 | },
877 | )
878 |
879 | if not response_decompose:
880 | self.logger.error("Failed to test decompose refactor type")
881 | return False
882 |
883 | response_decompose_data = self._parse_refactor_response(response_decompose)
884 |
885 | # Check that decompose type is properly tracked
886 | refactoring_status = response_decompose_data.get("refactoring_status", {})
887 | opportunities_by_type = refactoring_status.get("opportunities_by_type", {})
888 | if "decompose" not in opportunities_by_type:
889 | self.logger.error("Decompose opportunities not properly tracked")
890 | return False
891 |
892 | self.logger.info(" ✅ Decompose refactor type working correctly")
893 |
894 | # Test modernize type
895 | self.logger.info(" 1.6.2: Testing modernize refactor type")
896 | response_modernize, _ = self.call_mcp_tool(
897 | "refactor",
898 | {
899 | "step": "Analyzing code for modernization opportunities using newer Python features",
900 | "step_number": 1,
901 | "total_steps": 1,
902 | "next_step_required": False,
903 | "findings": "Found opportunities to use dataclasses, f-strings, pathlib, and proper logging instead of print statements.",
904 | "files_checked": [self.small_refactor_file],
905 | "relevant_files": [self.small_refactor_file],
906 | "relevant_context": ["UserData", "process_everything"],
907 | "issues_found": [
908 | {
909 | "type": "modernize",
910 | "severity": "medium",
911 | "description": "UserData class could use @dataclass decorator",
912 | },
913 | {
914 | "type": "modernize",
915 | "severity": "medium",
916 | "description": "Replace print statements with proper logging",
917 | },
918 | {"type": "modernize", "severity": "low", "description": "Use pathlib for file operations"},
919 | ],
920 | "confidence": "complete",
921 | "refactor_type": "modernize",
922 | "model": "flash",
923 | },
924 | )
925 |
926 | if not response_modernize:
927 | self.logger.error("Failed to test modernize refactor type")
928 | return False
929 |
930 | response_modernize_data = self._parse_refactor_response(response_modernize)
931 |
932 | # Check that modernize type is properly tracked
933 | refactoring_status = response_modernize_data.get("refactoring_status", {})
934 | opportunities_by_type = refactoring_status.get("opportunities_by_type", {})
935 | if "modernize" not in opportunities_by_type:
936 | self.logger.error("Modernize opportunities not properly tracked")
937 | return False
938 |
939 | self.logger.info(" ✅ Modernize refactor type working correctly")
940 |
941 | self.logger.info(" ✅ Different refactor types test completed successfully")
942 | return True
943 |
944 | except Exception as e:
945 | self.logger.error(f"Different refactor types test failed: {e}")
946 | return False
947 |
948 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
949 | """Call an MCP tool in-process - override for -specific response handling"""
950 | # Use in-process implementation to maintain conversation memory
951 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
952 |
953 | if not response_text:
954 | return None, None
955 |
956 | # Extract continuation_id from refactor response specifically
957 | continuation_id = self._extract_refactor_continuation_id(response_text)
958 |
959 | return response_text, continuation_id
960 |
961 | def _extract_refactor_continuation_id(self, response_text: str) -> Optional[str]:
962 | """Extract continuation_id from refactor response"""
963 | try:
964 | # Parse the response
965 | response_data = json.loads(response_text)
966 | return response_data.get("continuation_id")
967 |
968 | except json.JSONDecodeError as e:
969 | self.logger.debug(f"Failed to parse response for refactor continuation_id: {e}")
970 | return None
971 |
972 | def _parse_refactor_response(self, response_text: str) -> dict:
973 | """Parse refactor tool JSON response"""
974 | try:
975 | # Parse the response - it should be direct JSON
976 | return json.loads(response_text)
977 |
978 | except json.JSONDecodeError as e:
979 | self.logger.error(f"Failed to parse refactor response as JSON: {e}")
980 | self.logger.error(f"Response text: {response_text[:500]}...")
981 | return {}
982 |
983 | def _validate_refactoring_step_response(
984 | self,
985 | response_data: dict,
986 | expected_step: int,
987 | expected_total: int,
988 | expected_next_required: bool,
989 | expected_status: str,
990 | ) -> bool:
991 | """Validate a refactor investigation step response structure"""
992 | try:
993 | # Check status
994 | if response_data.get("status") != expected_status:
995 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
996 | return False
997 |
998 | # Check step number
999 | if response_data.get("step_number") != expected_step:
1000 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
1001 | return False
1002 |
1003 | # Check total steps
1004 | if response_data.get("total_steps") != expected_total:
1005 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
1006 | return False
1007 |
1008 | # Check next_step_required
1009 | if response_data.get("next_step_required") != expected_next_required:
1010 | self.logger.error(
1011 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
1012 | )
1013 | return False
1014 |
1015 | # Check refactoring_status exists
1016 | if "refactoring_status" not in response_data:
1017 | self.logger.error("Missing refactoring_status in response")
1018 | return False
1019 |
1020 | # Check next_steps guidance
1021 | if not response_data.get("next_steps"):
1022 | self.logger.error("Missing next_steps guidance in response")
1023 | return False
1024 |
1025 | return True
1026 |
1027 | except Exception as e:
1028 | self.logger.error(f"Error validating refactoring step response: {e}")
1029 | return False
1030 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_precommitworkflow_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | PrecommitWorkflow Tool Validation Test
4 |
5 | Tests the precommit tool's capabilities using the new workflow architecture.
6 | This validates that the workflow-based pre-commit validation provides step-by-step
7 | analysis with proper investigation guidance and expert analysis integration.
8 | """
9 |
10 | import json
11 | from typing import Optional
12 |
13 | from .conversation_base_test import ConversationBaseTest
14 |
15 |
16 | class PrecommitWorkflowValidationTest(ConversationBaseTest):
17 | """Test precommit tool with new workflow architecture"""
18 |
19 | @property
20 | def test_name(self) -> str:
21 | return "precommit_validation"
22 |
23 | @property
24 | def test_description(self) -> str:
25 | return "PrecommitWorkflow tool validation with new workflow architecture"
26 |
27 | def run_test(self) -> bool:
28 | """Test precommit tool capabilities"""
29 | # Set up the test environment
30 | self.setUp()
31 |
32 | try:
33 | self.logger.info("Test: PrecommitWorkflow tool validation (new architecture)")
34 |
35 | # Create test git repository structure with changes
36 | self._create_test_git_changes()
37 |
38 | # Test 1: Single validation session with multiple steps
39 | if not self._test_single_validation_session():
40 | return False
41 |
42 | # Test 2: Validation flow that requires refocusing
43 | if not self._test_validation_refocus_flow():
44 | return False
45 |
46 | # Test 3: Complete validation with expert analysis
47 | if not self._test_complete_validation_with_analysis():
48 | return False
49 |
50 | # Test 4: Certain confidence behavior
51 | if not self._test_certain_confidence():
52 | return False
53 |
54 | # Test 5: Context-aware file embedding
55 | if not self._test_context_aware_file_embedding():
56 | return False
57 |
58 | # Test 6: Multi-step file context optimization
59 | if not self._test_multi_step_file_context():
60 | return False
61 |
62 | self.logger.info(" ✅ All precommit validation tests passed")
63 | return True
64 |
65 | except Exception as e:
66 | self.logger.error(f"PrecommitWorkflow validation test failed: {e}")
67 | return False
68 |
69 | def _create_test_git_changes(self):
70 | """Create test files simulating git changes for pre-commit validation"""
71 | # Create a new API endpoint with potential security issues
72 | new_api_code = """#!/usr/bin/env python3
73 | from flask import Flask, request, jsonify
74 | import sqlite3
75 | import os
76 |
77 | app = Flask(__name__)
78 |
79 | @app.route('/api/user/<user_id>', methods=['GET'])
80 | def get_user(user_id):
81 | \"\"\"Get user information by ID\"\"\"
82 | # Potential SQL injection vulnerability
83 | conn = sqlite3.connect('users.db')
84 | cursor = conn.cursor()
85 |
86 | # BUG: Direct string interpolation creates SQL injection risk
87 | query = f"SELECT * FROM users WHERE id = {user_id}"
88 | cursor.execute(query)
89 |
90 | result = cursor.fetchone()
91 | conn.close()
92 |
93 | if result:
94 | return jsonify({
95 | 'id': result[0],
96 | 'username': result[1],
97 | 'email': result[2],
98 | 'password_hash': result[3] # Security issue: exposing password hash
99 | })
100 | else:
101 | return jsonify({'error': 'User not found'}), 404
102 |
103 | @app.route('/api/admin/users', methods=['GET'])
104 | def list_all_users():
105 | \"\"\"Admin endpoint to list all users\"\"\"
106 | # Missing authentication check
107 | conn = sqlite3.connect('users.db')
108 | cursor = conn.cursor()
109 | cursor.execute("SELECT id, username, email FROM users")
110 |
111 | users = []
112 | for row in cursor.fetchall():
113 | users.append({
114 | 'id': row[0],
115 | 'username': row[1],
116 | 'email': row[2]
117 | })
118 |
119 | conn.close()
120 | return jsonify(users)
121 |
122 | if __name__ == '__main__':
123 | # Debug mode in production is a security risk
124 | app.run(debug=True, host='0.0.0.0')
125 | """
126 |
127 | # Create configuration file with issues
128 | config_code = """#!/usr/bin/env python3
129 | import os
130 |
131 | # Database configuration
132 | DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///users.db')
133 |
134 | # Security settings
135 | SECRET_KEY = "hardcoded-secret-key-123" # Security issue: hardcoded secret
136 | DEBUG_MODE = True # Should be environment-based
137 |
138 | # API settings
139 | API_RATE_LIMIT = 1000 # Very high, no rate limiting effectively
140 | MAX_FILE_UPLOAD = 50 * 1024 * 1024 # 50MB - quite large
141 |
142 | # Missing important security headers configuration
143 | CORS_ORIGINS = "*" # Security issue: allows all origins
144 | """
145 |
146 | # Create test files
147 | self.api_file = self.create_additional_test_file("api_endpoints.py", new_api_code)
148 | self.config_file = self.create_additional_test_file("config.py", config_code)
149 | self.logger.info(f" ✅ Created test files: {self.api_file}, {self.config_file}")
150 |
151 | # Create change description
152 | change_description = """COMMIT DESCRIPTION:
153 | Added new user API endpoints and configuration for user management system.
154 |
155 | CHANGES MADE:
156 | - Added GET /api/user/<user_id> endpoint to retrieve user information
157 | - Added GET /api/admin/users endpoint for admin user listing
158 | - Added configuration file with database and security settings
159 | - Set up Flask application with basic routing
160 |
161 | REQUIREMENTS:
162 | - User data should be retrievable by ID
163 | - Admin should be able to list all users
164 | - System should be configurable via environment variables
165 | - Security should be properly implemented
166 | """
167 |
168 | self.changes_file = self.create_additional_test_file("commit_description.txt", change_description)
169 | self.logger.info(f" ✅ Created change description: {self.changes_file}")
170 |
171 | def _test_single_validation_session(self) -> bool:
172 | """Test a complete validation session with multiple steps"""
173 | try:
174 | self.logger.info(" 1.1: Testing single validation session")
175 |
176 | # Step 1: Start validation
177 | self.logger.info(" 1.1.1: Step 1 - Initial validation plan")
178 | response1, continuation_id = self.call_mcp_tool(
179 | "precommit",
180 | {
181 | "step": "I need to perform comprehensive pre-commit validation for new API endpoints. Let me start by analyzing the changes and identifying potential issues.",
182 | "step_number": 1,
183 | "total_steps": 4,
184 | "next_step_required": True,
185 | "findings": "New user API endpoints and configuration added. Need to examine for security, performance, and best practices.",
186 | "files_checked": [self.changes_file],
187 | "relevant_files": [self.changes_file],
188 | "path": self.test_dir, # Required for step 1
189 | "review_type": "full",
190 | "severity_filter": "all",
191 | },
192 | )
193 |
194 | if not response1 or not continuation_id:
195 | self.logger.error("Failed to get initial validation response")
196 | return False
197 |
198 | # Parse and validate JSON response
199 | response1_data = self._parse_precommit_response(response1)
200 | if not response1_data:
201 | return False
202 |
203 | # Validate step 1 response structure - expect pause_for_validation for next_step_required=True
204 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_validation"):
205 | return False
206 |
207 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
208 |
209 | # Step 2: Examine the code for issues
210 | self.logger.info(" 1.1.2: Step 2 - Code examination")
211 | response2, _ = self.call_mcp_tool(
212 | "precommit",
213 | {
214 | "step": "Now examining the API endpoint implementation and configuration for security vulnerabilities and best practices violations.",
215 | "step_number": 2,
216 | "total_steps": 4,
217 | "next_step_required": True,
218 | "findings": "Found multiple critical security issues: SQL injection vulnerability in get_user(), hardcoded secrets in config, missing authentication, and password hash exposure.",
219 | "files_checked": [self.changes_file, self.api_file, self.config_file],
220 | "relevant_files": [self.api_file, self.config_file],
221 | "relevant_context": ["get_user", "list_all_users"],
222 | "issues_found": [
223 | {"severity": "critical", "description": "SQL injection vulnerability in user lookup"},
224 | {"severity": "high", "description": "Hardcoded secret key in configuration"},
225 | {"severity": "high", "description": "Password hash exposed in API response"},
226 | {"severity": "medium", "description": "Missing authentication on admin endpoint"},
227 | ],
228 | # Assessment field removed - using precommit_type instead
229 | # Confidence field removed - using precommit_type instead
230 | "continuation_id": continuation_id,
231 | },
232 | )
233 |
234 | if not response2:
235 | self.logger.error("Failed to continue validation to step 2")
236 | return False
237 |
238 | response2_data = self._parse_precommit_response(response2)
239 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_validation"):
240 | return False
241 |
242 | # Check validation status tracking
243 | validation_status = response2_data.get("validation_status", {})
244 | if validation_status.get("files_checked", 0) < 3:
245 | self.logger.error("Files checked count not properly tracked")
246 | return False
247 |
248 | if validation_status.get("issues_identified", 0) != 4:
249 | self.logger.error("Issues found not properly tracked")
250 | return False
251 |
252 | if validation_status.get("precommit_type") != "external":
253 | self.logger.error("Precommit type not properly tracked")
254 | return False
255 |
256 | self.logger.info(" ✅ Step 2 successful with proper tracking")
257 |
258 | # Store continuation_id for next test
259 | self.validation_continuation_id = continuation_id
260 | return True
261 |
262 | except Exception as e:
263 | self.logger.error(f"Single validation session test failed: {e}")
264 | return False
265 |
266 | def _test_validation_refocus_flow(self) -> bool:
267 | """Test validation workflow that requires refocusing to revise findings"""
268 | try:
269 | self.logger.info(" 1.2: Testing validation refocus workflow")
270 |
271 | # Start a new validation for testing refocus behaviour
272 | self.logger.info(" 1.2.1: Start validation for refocus test")
273 | response1, continuation_id = self.call_mcp_tool(
274 | "precommit",
275 | {
276 | "step": "Validating database connection optimization changes",
277 | "step_number": 1,
278 | "total_steps": 4,
279 | "next_step_required": True,
280 | "findings": "Initial analysis shows database connection pooling implementation",
281 | "files_checked": ["/db/connection.py"],
282 | "relevant_files": ["/db/connection.py"],
283 | "path": self.test_dir,
284 | },
285 | )
286 |
287 | if not response1 or not continuation_id:
288 | self.logger.error("Failed to start refocus test validation")
289 | return False
290 |
291 | # Step 2: Wrong direction
292 | self.logger.info(" 1.2.2: Step 2 - Wrong validation focus")
293 | response2, _ = self.call_mcp_tool(
294 | "precommit",
295 | {
296 | "step": "Focusing on connection pool size optimization",
297 | "step_number": 2,
298 | "total_steps": 4,
299 | "next_step_required": True,
300 | "findings": "Connection pool configuration seems reasonable, might be looking in wrong place",
301 | "files_checked": ["/db/connection.py", "/config/database.py"],
302 | "relevant_files": [],
303 | # Assessment fields removed - using precommit_type instead
304 | "continuation_id": continuation_id,
305 | },
306 | )
307 |
308 | if not response2:
309 | self.logger.error("Failed to continue to step 2")
310 | return False
311 |
312 | # Step 3: Shift investigation focus
313 | self.logger.info(" 1.2.3: Step 3 - Refocus and revise approach")
314 | response3, _ = self.call_mcp_tool(
315 | "precommit",
316 | {
317 | "step": "Refocusing - the issue might not be database configuration. Let me examine the actual SQL queries and data access patterns instead.",
318 | "step_number": 3,
319 | "total_steps": 4,
320 | "next_step_required": True,
321 | "findings": "Found inefficient N+1 query pattern in user data loading causing performance issues",
322 | "files_checked": ["/models/user.py"],
323 | "relevant_files": ["/models/user.py"],
324 | "relevant_context": ["User.load_profile"],
325 | "issues_found": [
326 | {"severity": "medium", "description": "N+1 query pattern in user profile loading"}
327 | ],
328 | # Assessment fields removed - using precommit_type instead
329 | "continuation_id": continuation_id,
330 | },
331 | )
332 |
333 | if not response3:
334 | self.logger.error("Failed to refocus")
335 | return False
336 |
337 | response3_data = self._parse_precommit_response(response3)
338 | if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_validation"):
339 | return False
340 |
341 | self.logger.info(" ✅ Refocus flow working correctly")
342 | return True
343 |
344 | except Exception as e:
345 | self.logger.error(f"Refocus test failed: {e}")
346 | return False
347 |
348 | def _test_complete_validation_with_analysis(self) -> bool:
349 | """Test complete validation ending with expert analysis"""
350 | try:
351 | self.logger.info(" 1.3: Testing complete validation with expert analysis")
352 |
353 | # Use the continuation from first test
354 | continuation_id = getattr(self, "validation_continuation_id", None)
355 | if not continuation_id:
356 | # Start fresh if no continuation available
357 | self.logger.info(" 1.3.0: Starting fresh validation")
358 | response0, continuation_id = self.call_mcp_tool(
359 | "precommit",
360 | {
361 | "step": "Validating the security fixes for API endpoints",
362 | "step_number": 1,
363 | "total_steps": 2,
364 | "next_step_required": True,
365 | "findings": "Found critical security vulnerabilities in API implementation",
366 | "files_checked": [self.api_file],
367 | "relevant_files": [self.api_file],
368 | "relevant_context": ["get_user", "list_all_users"],
369 | "issues_found": [{"severity": "critical", "description": "SQL injection vulnerability"}],
370 | "path": self.test_dir,
371 | },
372 | )
373 | if not response0 or not continuation_id:
374 | self.logger.error("Failed to start fresh validation")
375 | return False
376 |
377 | # Final step - trigger expert analysis
378 | self.logger.info(" 1.3.1: Final step - complete validation")
379 | response_final, _ = self.call_mcp_tool(
380 | "precommit",
381 | {
382 | "step": "Validation complete. I have identified all critical security issues and missing safeguards in the new API endpoints.",
383 | "step_number": 2,
384 | "total_steps": 2,
385 | "next_step_required": False, # Final step - triggers expert analysis
386 | "findings": "Comprehensive analysis complete: SQL injection, hardcoded secrets, missing authentication, password exposure, and insecure defaults all identified with specific fixes needed.",
387 | "files_checked": [self.api_file, self.config_file],
388 | "relevant_files": [self.api_file, self.config_file],
389 | "relevant_context": ["get_user", "list_all_users", "SECRET_KEY", "DEBUG_MODE"],
390 | "issues_found": [
391 | {"severity": "critical", "description": "SQL injection vulnerability in user lookup query"},
392 | {"severity": "high", "description": "Hardcoded secret key exposes application security"},
393 | {"severity": "high", "description": "Password hash exposed in API response"},
394 | {"severity": "medium", "description": "Missing authentication on admin endpoint"},
395 | {"severity": "medium", "description": "Debug mode enabled in production configuration"},
396 | ],
397 | # Confidence field removed - using precommit_type instead
398 | "continuation_id": continuation_id,
399 | "model": "flash", # Use flash for expert analysis
400 | },
401 | )
402 |
403 | if not response_final:
404 | self.logger.error("Failed to complete validation")
405 | return False
406 |
407 | response_final_data = self._parse_precommit_response(response_final)
408 | if not response_final_data:
409 | return False
410 |
411 | # Validate final response structure - expect calling_expert_analysis for next_step_required=False
412 | if response_final_data.get("status") != "calling_expert_analysis":
413 | self.logger.error(
414 | f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
415 | )
416 | return False
417 |
418 | if not response_final_data.get("validation_complete"):
419 | self.logger.error("Expected validation_complete=true for final step")
420 | return False
421 |
422 | # Check for expert analysis
423 | if "expert_analysis" not in response_final_data:
424 | self.logger.error("Missing expert_analysis in final response")
425 | return False
426 |
427 | expert_analysis = response_final_data.get("expert_analysis", {})
428 |
429 | # Check for expected analysis content (checking common patterns)
430 | analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
431 |
432 | # Look for security issue identification
433 | security_indicators = ["sql", "injection", "security", "hardcoded", "secret", "authentication"]
434 | found_indicators = sum(1 for indicator in security_indicators if indicator in analysis_text)
435 |
436 | if found_indicators >= 3:
437 | self.logger.info(" ✅ Expert analysis identified security issues correctly")
438 | else:
439 | self.logger.warning(
440 | f" ⚠️ Expert analysis may not have fully identified security issues (found {found_indicators}/6 indicators)"
441 | )
442 |
443 | # Check complete validation summary
444 | if "complete_validation" not in response_final_data:
445 | self.logger.error("Missing complete_validation in final response")
446 | return False
447 |
448 | complete_validation = response_final_data["complete_validation"]
449 | if not complete_validation.get("relevant_context"):
450 | self.logger.error("Missing relevant context in complete validation")
451 | return False
452 |
453 | if "get_user" not in complete_validation["relevant_context"]:
454 | self.logger.error("Expected function not found in validation summary")
455 | return False
456 |
457 | self.logger.info(" ✅ Complete validation with expert analysis successful")
458 | return True
459 |
460 | except Exception as e:
461 | self.logger.error(f"Complete validation test failed: {e}")
462 | return False
463 |
464 | def _test_certain_confidence(self) -> bool:
465 | """Test certain confidence behavior - should skip expert analysis"""
466 | try:
467 | self.logger.info(" 1.4: Testing certain confidence behavior")
468 |
469 | # Test certain confidence - should skip expert analysis
470 | self.logger.info(" 1.4.1: Certain confidence validation")
471 | response_certain, _ = self.call_mcp_tool(
472 | "precommit",
473 | {
474 | "step": "I have confirmed all security issues with 100% certainty: SQL injection, hardcoded secrets, and missing authentication.",
475 | "step_number": 1,
476 | "total_steps": 1,
477 | "next_step_required": False, # Final step
478 | "findings": "All critical issues identified: parameterized queries needed, environment variables for secrets, authentication middleware required, and debug mode must be disabled for production.",
479 | "files_checked": [self.api_file, self.config_file],
480 | "relevant_files": [self.api_file, self.config_file],
481 | "relevant_context": ["get_user", "list_all_users"],
482 | "issues_found": [
483 | {
484 | "severity": "critical",
485 | "description": "SQL injection vulnerability - fix with parameterized queries",
486 | },
487 | {"severity": "high", "description": "Hardcoded secret - use environment variables"},
488 | {"severity": "medium", "description": "Missing authentication - add middleware"},
489 | ],
490 | "precommit_type": "internal", # This should skip expert analysis
491 | "path": self.test_dir,
492 | "model": "flash",
493 | },
494 | )
495 |
496 | if not response_certain:
497 | self.logger.error("Failed to test certain confidence")
498 | return False
499 |
500 | response_certain_data = self._parse_precommit_response(response_certain)
501 | if not response_certain_data:
502 | return False
503 |
504 | # Validate certain confidence response - should skip expert analysis
505 | if response_certain_data.get("status") != "validation_complete_ready_for_commit":
506 | self.logger.error(
507 | f"Expected status 'validation_complete_ready_for_commit', got '{response_certain_data.get('status')}'"
508 | )
509 | return False
510 |
511 | if not response_certain_data.get("skip_expert_analysis"):
512 | self.logger.error("Expected skip_expert_analysis=true for certain confidence")
513 | return False
514 |
515 | expert_analysis = response_certain_data.get("expert_analysis", {})
516 | if expert_analysis.get("status") != "skipped_due_to_internal_analysis_type":
517 | self.logger.error("Expert analysis should be skipped for certain confidence")
518 | return False
519 |
520 | self.logger.info(" ✅ Certain confidence behavior working correctly")
521 | return True
522 |
523 | except Exception as e:
524 | self.logger.error(f"Certain confidence test failed: {e}")
525 | return False
526 |
527 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
528 | """Call an MCP tool in-process - override for precommit-specific response handling"""
529 | # Use in-process implementation to maintain conversation memory
530 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
531 |
532 | if not response_text:
533 | return None, None
534 |
535 | # Extract continuation_id from precommit response specifically
536 | continuation_id = self._extract_precommit_continuation_id(response_text)
537 |
538 | return response_text, continuation_id
539 |
540 | def _extract_precommit_continuation_id(self, response_text: str) -> Optional[str]:
541 | """Extract continuation_id from precommit response"""
542 | try:
543 | # Parse the response
544 | response_data = json.loads(response_text)
545 | return response_data.get("continuation_id")
546 |
547 | except json.JSONDecodeError as e:
548 | self.logger.debug(f"Failed to parse response for precommit continuation_id: {e}")
549 | return None
550 |
551 | def _parse_precommit_response(self, response_text: str) -> dict:
552 | """Parse precommit tool JSON response"""
553 | try:
554 | # Parse the response - it should be direct JSON
555 | return json.loads(response_text)
556 |
557 | except json.JSONDecodeError as e:
558 | self.logger.error(f"Failed to parse precommit response as JSON: {e}")
559 | self.logger.error(f"Response text: {response_text[:500]}...")
560 | return {}
561 |
562 | def _validate_step_response(
563 | self,
564 | response_data: dict,
565 | expected_step: int,
566 | expected_total: int,
567 | expected_next_required: bool,
568 | expected_status: str,
569 | ) -> bool:
570 | """Validate a precommit validation step response structure"""
571 | try:
572 | # Check status
573 | if response_data.get("status") != expected_status:
574 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
575 | return False
576 |
577 | # Check step number
578 | if response_data.get("step_number") != expected_step:
579 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
580 | return False
581 |
582 | # Check total steps
583 | if response_data.get("total_steps") != expected_total:
584 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
585 | return False
586 |
587 | # Check next_step_required
588 | if response_data.get("next_step_required") != expected_next_required:
589 | self.logger.error(
590 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
591 | )
592 | return False
593 |
594 | # Check validation_status exists
595 | if "validation_status" not in response_data:
596 | self.logger.error("Missing validation_status in response")
597 | return False
598 |
599 | # Check next_steps guidance
600 | if not response_data.get("next_steps"):
601 | self.logger.error("Missing next_steps guidance in response")
602 | return False
603 |
604 | return True
605 |
606 | except Exception as e:
607 | self.logger.error(f"Error validating step response: {e}")
608 | return False
609 |
610 | def _test_context_aware_file_embedding(self) -> bool:
611 | """Test context-aware file embedding optimization"""
612 | try:
613 | self.logger.info(" 1.5: Testing context-aware file embedding")
614 |
615 | # Create multiple test files for context testing
616 | auth_file_content = """#!/usr/bin/env python3
617 | from functools import wraps
618 | from flask import request, jsonify
619 |
620 | def require_auth(f):
621 | \"\"\"Authentication decorator\"\"\"
622 | @wraps(f)
623 | def decorated_function(*args, **kwargs):
624 | token = request.headers.get('Authorization')
625 | if not token:
626 | return jsonify({'error': 'No token provided'}), 401
627 |
628 | # Validate token here
629 | if not validate_token(token):
630 | return jsonify({'error': 'Invalid token'}), 401
631 |
632 | return f(*args, **kwargs)
633 | return decorated_function
634 |
635 | def validate_token(token):
636 | \"\"\"Validate authentication token\"\"\"
637 | # Token validation logic
638 | return token.startswith('Bearer ')
639 | """
640 |
641 | middleware_file_content = """#!/usr/bin/env python3
642 | from flask import Flask, request, g
643 | import time
644 |
645 | def add_security_headers(app):
646 | \"\"\"Add security headers to all responses\"\"\"
647 | @app.after_request
648 | def security_headers(response):
649 | response.headers['X-Content-Type-Options'] = 'nosniff'
650 | response.headers['X-Frame-Options'] = 'DENY'
651 | response.headers['X-XSS-Protection'] = '1; mode=block'
652 | return response
653 |
654 | def rate_limiting_middleware(app):
655 | \"\"\"Basic rate limiting\"\"\"
656 | @app.before_request
657 | def limit_remote_addr():
658 | # Simple rate limiting logic
659 | pass
660 | """
661 |
662 | # Create test files
663 | auth_file = self.create_additional_test_file("auth.py", auth_file_content)
664 | middleware_file = self.create_additional_test_file("middleware.py", middleware_file_content)
665 |
666 | # Test 1: New conversation, intermediate step - should only reference files
667 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
668 | response1, continuation_id = self.call_mcp_tool(
669 | "precommit",
670 | {
671 | "step": "Starting validation of new authentication and security middleware",
672 | "step_number": 1,
673 | "total_steps": 3,
674 | "next_step_required": True, # Intermediate step
675 | "findings": "Initial analysis of authentication and middleware components",
676 | "files_checked": [auth_file, middleware_file],
677 | "relevant_files": [auth_file], # This should be referenced, not embedded
678 | "relevant_context": ["require_auth"],
679 | # Assessment fields removed - using precommit_type instead
680 | "path": self.test_dir,
681 | "model": "flash",
682 | },
683 | )
684 |
685 | if not response1 or not continuation_id:
686 | self.logger.error("Failed to start context-aware file embedding test")
687 | return False
688 |
689 | response1_data = self._parse_precommit_response(response1)
690 | if not response1_data:
691 | return False
692 |
693 | # Check file context - should be reference_only for intermediate step
694 | file_context = response1_data.get("file_context", {})
695 | if file_context.get("type") != "reference_only":
696 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
697 | return False
698 |
699 | if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
700 | self.logger.error("Expected context optimization message for reference_only")
701 | return False
702 |
703 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
704 |
705 | # Test 2: Intermediate step with continuation - should still only reference
706 | self.logger.info(" 1.5.2: Intermediate step with continuation (should reference only)")
707 | response2, _ = self.call_mcp_tool(
708 | "precommit",
709 | {
710 | "step": "Continuing validation with detailed security analysis",
711 | "step_number": 2,
712 | "total_steps": 3,
713 | "next_step_required": True, # Still intermediate
714 | "continuation_id": continuation_id,
715 | "findings": "Found potential issues in token validation and missing security headers",
716 | "files_checked": [auth_file, middleware_file],
717 | "relevant_files": [auth_file, middleware_file], # Both files referenced
718 | "relevant_context": ["require_auth", "validate_token", "add_security_headers"],
719 | "issues_found": [
720 | {"severity": "medium", "description": "Basic token validation might be insufficient"}
721 | ],
722 | # Assessment fields removed - using precommit_type instead
723 | "model": "flash",
724 | },
725 | )
726 |
727 | if not response2:
728 | self.logger.error("Failed to continue to step 2")
729 | return False
730 |
731 | response2_data = self._parse_precommit_response(response2)
732 | if not response2_data:
733 | return False
734 |
735 | # Check file context - should still be reference_only
736 | file_context2 = response2_data.get("file_context", {})
737 | if file_context2.get("type") != "reference_only":
738 | self.logger.error(f"Expected reference_only file context for step 2, got: {file_context2.get('type')}")
739 | return False
740 |
741 | # Should include reference note
742 | if not file_context2.get("note"):
743 | self.logger.error("Expected file reference note for intermediate step")
744 | return False
745 |
746 | reference_note = file_context2.get("note", "")
747 | if "auth.py" not in reference_note or "middleware.py" not in reference_note:
748 | self.logger.error("File reference note should mention both files")
749 | return False
750 |
751 | self.logger.info(" ✅ Intermediate step with continuation correctly uses reference_only")
752 |
753 | # Test 3: Final step - should embed files for expert analysis
754 | self.logger.info(" 1.5.3: Final step (should embed files)")
755 | response3, _ = self.call_mcp_tool(
756 | "precommit",
757 | {
758 | "step": "Validation complete - identified security gaps and improvement areas",
759 | "step_number": 3,
760 | "total_steps": 3,
761 | "next_step_required": False, # Final step - should embed files
762 | "continuation_id": continuation_id,
763 | "findings": "Security implementation has several gaps: token validation is basic, missing CSRF protection, and rate limiting is not implemented",
764 | "files_checked": [auth_file, middleware_file],
765 | "relevant_files": [auth_file, middleware_file], # Should be fully embedded
766 | "relevant_context": ["require_auth", "validate_token", "add_security_headers"],
767 | "issues_found": [
768 | {"severity": "medium", "description": "Token validation needs strengthening"},
769 | {"severity": "low", "description": "Missing CSRF protection"},
770 | {"severity": "low", "description": "Rate limiting not implemented"},
771 | ],
772 | # Assessment field removed - using precommit_type instead
773 | # Confidence field removed - using precommit_type instead
774 | "model": "flash",
775 | },
776 | )
777 |
778 | if not response3:
779 | self.logger.error("Failed to complete to final step")
780 | return False
781 |
782 | response3_data = self._parse_precommit_response(response3)
783 | if not response3_data:
784 | return False
785 |
786 | # Check file context - should be fully_embedded for final step
787 | file_context3 = response3_data.get("file_context", {})
788 | if file_context3.get("type") != "fully_embedded":
789 | self.logger.error(
790 | f"Expected fully_embedded file context for final step, got: {file_context3.get('type')}"
791 | )
792 | return False
793 |
794 | if "Full file content embedded for expert analysis" not in file_context3.get("context_optimization", ""):
795 | self.logger.error("Expected expert analysis optimization message for fully_embedded")
796 | return False
797 |
798 | # Should show files embedded count
799 | files_embedded = file_context3.get("files_embedded", 0)
800 | if files_embedded == 0:
801 | # This is OK - files might already be in conversation history
802 | self.logger.info(
803 | " ℹ️ Files embedded count is 0 - files already in conversation history (smart deduplication)"
804 | )
805 | else:
806 | self.logger.info(f" ✅ Files embedded count: {files_embedded}")
807 |
808 | self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
809 |
810 | # Verify expert analysis was called for final step
811 | if response3_data.get("status") != "calling_expert_analysis":
812 | self.logger.error("Final step should trigger expert analysis")
813 | return False
814 |
815 | if "expert_analysis" not in response3_data:
816 | self.logger.error("Expert analysis should be present in final step")
817 | return False
818 |
819 | self.logger.info(" ✅ Context-aware file embedding test completed successfully")
820 | return True
821 |
822 | except Exception as e:
823 | self.logger.error(f"Context-aware file embedding test failed: {e}")
824 | return False
825 |
826 | def _test_multi_step_file_context(self) -> bool:
827 | """Test multi-step workflow with proper file context transitions"""
828 | try:
829 | self.logger.info(" 1.6: Testing multi-step file context optimization")
830 |
831 | # Create a complex scenario with multiple files for pre-commit validation
832 | database_content = """#!/usr/bin/env python3
833 | import sqlite3
834 | import os
835 | from contextlib import contextmanager
836 |
837 | class DatabaseManager:
838 | def __init__(self):
839 | self.db_path = os.getenv('DATABASE_PATH', 'app.db')
840 |
841 | @contextmanager
842 | def get_connection(self):
843 | \"\"\"Get database connection with proper cleanup\"\"\"
844 | conn = None
845 | try:
846 | conn = sqlite3.connect(self.db_path)
847 | yield conn
848 | finally:
849 | if conn:
850 | conn.close()
851 |
852 | def create_user(self, username, email, password_hash):
853 | \"\"\"Create a new user\"\"\"
854 | with self.get_connection() as conn:
855 | cursor = conn.cursor()
856 | # Proper parameterized query
857 | cursor.execute(
858 | "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
859 | (username, email, password_hash)
860 | )
861 | conn.commit()
862 | return cursor.lastrowid
863 | """
864 |
865 | tests_content = """#!/usr/bin/env python3
866 | import unittest
867 | from unittest.mock import patch, MagicMock
868 | from database_manager import DatabaseManager
869 |
870 | class TestDatabaseManager(unittest.TestCase):
871 | def setUp(self):
872 | self.db_manager = DatabaseManager()
873 |
874 | @patch('sqlite3.connect')
875 | def test_create_user(self, mock_connect):
876 | \"\"\"Test user creation\"\"\"
877 | mock_conn = MagicMock()
878 | mock_cursor = MagicMock()
879 | mock_cursor.lastrowid = 123
880 | mock_conn.cursor.return_value = mock_cursor
881 | mock_connect.return_value = mock_conn
882 |
883 | user_id = self.db_manager.create_user('testuser', '[email protected]', 'hashed_password')
884 |
885 | self.assertEqual(user_id, 123)
886 | mock_cursor.execute.assert_called_once_with(
887 | "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
888 | ('testuser', '[email protected]', 'hashed_password')
889 | )
890 |
891 | if __name__ == '__main__':
892 | unittest.main()
893 | """
894 |
895 | # Create test files
896 | db_file = self.create_additional_test_file("database_manager.py", database_content)
897 | test_file = self.create_additional_test_file("test_database.py", tests_content)
898 |
899 | # Step 1: Start validation (new conversation)
900 | self.logger.info(" 1.6.1: Step 1 - Start validation")
901 | response1, continuation_id = self.call_mcp_tool(
902 | "precommit",
903 | {
904 | "step": "Validating new database manager implementation and corresponding tests",
905 | "step_number": 1,
906 | "total_steps": 4,
907 | "next_step_required": True,
908 | "findings": "New database manager with connection handling and user creation functionality",
909 | "files_checked": [db_file],
910 | "relevant_files": [db_file],
911 | "relevant_context": [],
912 | # Assessment fields removed - using precommit_type instead
913 | "path": self.test_dir,
914 | "model": "flash",
915 | },
916 | )
917 |
918 | if not response1 or not continuation_id:
919 | self.logger.error("Failed to start multi-step file context test")
920 | return False
921 |
922 | response1_data = self._parse_precommit_response(response1)
923 |
924 | # Validate step 1 - should use reference_only
925 | file_context1 = response1_data.get("file_context", {})
926 | if file_context1.get("type") != "reference_only":
927 | self.logger.error("Step 1 should use reference_only file context")
928 | return False
929 |
930 | self.logger.info(" ✅ Step 1: reference_only file context")
931 |
932 | # Step 2: Expand validation
933 | self.logger.info(" 1.6.2: Step 2 - Expand validation")
934 | response2, _ = self.call_mcp_tool(
935 | "precommit",
936 | {
937 | "step": "Found good database implementation - now examining test coverage",
938 | "step_number": 2,
939 | "total_steps": 4,
940 | "next_step_required": True,
941 | "continuation_id": continuation_id,
942 | "findings": "Database manager uses proper parameterized queries and context managers. Test file provides good coverage with mocking.",
943 | "files_checked": [db_file, test_file],
944 | "relevant_files": [db_file, test_file],
945 | "relevant_context": ["DatabaseManager.create_user", "TestDatabaseManager.test_create_user"],
946 | # Assessment fields removed - using precommit_type instead
947 | "model": "flash",
948 | },
949 | )
950 |
951 | if not response2:
952 | self.logger.error("Failed to continue to step 2")
953 | return False
954 |
955 | response2_data = self._parse_precommit_response(response2)
956 |
957 | # Validate step 2 - should still use reference_only
958 | file_context2 = response2_data.get("file_context", {})
959 | if file_context2.get("type") != "reference_only":
960 | self.logger.error("Step 2 should use reference_only file context")
961 | return False
962 |
963 | # Should reference both files
964 | reference_note = file_context2.get("note", "")
965 | if "database_manager.py" not in reference_note or "test_database.py" not in reference_note:
966 | self.logger.error("Step 2 should reference both files in note")
967 | return False
968 |
969 | self.logger.info(" ✅ Step 2: reference_only file context with multiple files")
970 |
971 | # Step 3: Deep analysis
972 | self.logger.info(" 1.6.3: Step 3 - Deep analysis")
973 | response3, _ = self.call_mcp_tool(
974 | "precommit",
975 | {
976 | "step": "Performing comprehensive security and best practices analysis",
977 | "step_number": 3,
978 | "total_steps": 4,
979 | "next_step_required": True,
980 | "continuation_id": continuation_id,
981 | "findings": "Code follows security best practices: parameterized queries prevent SQL injection, proper resource cleanup with context managers, environment-based configuration.",
982 | "files_checked": [db_file, test_file],
983 | "relevant_files": [db_file, test_file],
984 | "relevant_context": ["DatabaseManager.get_connection", "DatabaseManager.create_user"],
985 | "issues_found": [], # No issues found
986 | # Assessment field removed - using precommit_type instead
987 | # Confidence field removed - using precommit_type instead
988 | "model": "flash",
989 | },
990 | )
991 |
992 | if not response3:
993 | self.logger.error("Failed to continue to step 3")
994 | return False
995 |
996 | response3_data = self._parse_precommit_response(response3)
997 |
998 | # Validate step 3 - should still use reference_only
999 | file_context3 = response3_data.get("file_context", {})
1000 | if file_context3.get("type") != "reference_only":
1001 | self.logger.error("Step 3 should use reference_only file context")
1002 | return False
1003 |
1004 | self.logger.info(" ✅ Step 3: reference_only file context")
1005 |
1006 | # Step 4: Final validation with expert consultation
1007 | self.logger.info(" 1.6.4: Step 4 - Final step with expert analysis")
1008 | response4, _ = self.call_mcp_tool(
1009 | "precommit",
1010 | {
1011 | "step": "Validation complete - code is ready for commit",
1012 | "step_number": 4,
1013 | "total_steps": 4,
1014 | "next_step_required": False, # Final step - should embed files
1015 | "continuation_id": continuation_id,
1016 | "findings": "Comprehensive validation complete: secure implementation with parameterized queries, proper resource management, good test coverage, and no security vulnerabilities identified.",
1017 | "files_checked": [db_file, test_file],
1018 | "relevant_files": [db_file, test_file],
1019 | "relevant_context": ["DatabaseManager", "TestDatabaseManager"],
1020 | "issues_found": [],
1021 | # Assessment field removed - using precommit_type instead
1022 | # Confidence field removed - using precommit_type instead
1023 | "model": "flash",
1024 | },
1025 | )
1026 |
1027 | if not response4:
1028 | self.logger.error("Failed to complete to final step")
1029 | return False
1030 |
1031 | response4_data = self._parse_precommit_response(response4)
1032 |
1033 | # Validate step 4 - should use fully_embedded for expert analysis
1034 | file_context4 = response4_data.get("file_context", {})
1035 | if file_context4.get("type") != "fully_embedded":
1036 | self.logger.error("Step 4 (final) should use fully_embedded file context")
1037 | return False
1038 |
1039 | if "expert analysis" not in file_context4.get("context_optimization", "").lower():
1040 | self.logger.error("Final step should mention expert analysis in context optimization")
1041 | return False
1042 |
1043 | # Verify expert analysis was triggered
1044 | if response4_data.get("status") != "calling_expert_analysis":
1045 | self.logger.error("Final step should trigger expert analysis")
1046 | return False
1047 |
1048 | # Check that expert analysis has file context
1049 | expert_analysis = response4_data.get("expert_analysis", {})
1050 | if not expert_analysis:
1051 | self.logger.error("Expert analysis should be present in final step")
1052 | return False
1053 |
1054 | self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
1055 |
1056 | # Validate the complete workflow progression
1057 | progression_summary = {
1058 | "step_1": "reference_only (new conversation, intermediate)",
1059 | "step_2": "reference_only (continuation, intermediate)",
1060 | "step_3": "reference_only (continuation, intermediate)",
1061 | "step_4": "fully_embedded (continuation, final)",
1062 | }
1063 |
1064 | self.logger.info(" 📋 File context progression:")
1065 | for step, context_type in progression_summary.items():
1066 | self.logger.info(f" {step}: {context_type}")
1067 |
1068 | self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
1069 | return True
1070 |
1071 | except Exception as e:
1072 | self.logger.error(f"Multi-step file context test failed: {e}")
1073 | return False
1074 |
```