This is page 4 of 4. Use http://codebase.md/ilikepizza2/qa-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .env.example
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── LICENSE
├── main.py
├── mcp_server.py
├── README.md
├── requirements.txt
├── src
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── auth_agent.py
│ │ ├── crawler_agent.py
│ │ ├── js_utils
│ │ │ └── xpathgenerator.js
│ │ └── recorder_agent.py
│ ├── browser
│ │ ├── __init__.py
│ │ ├── browser_controller.py
│ │ └── panel
│ │ └── panel.py
│ ├── core
│ │ ├── __init__.py
│ │ └── task_manager.py
│ ├── dom
│ │ ├── buildDomTree.js
│ │ ├── history
│ │ │ ├── service.py
│ │ │ └── view.py
│ │ ├── service.py
│ │ └── views.py
│ ├── execution
│ │ ├── __init__.py
│ │ └── executor.py
│ ├── llm
│ │ ├── __init__.py
│ │ ├── clients
│ │ │ ├── azure_openai_client.py
│ │ │ ├── gemini_client.py
│ │ │ └── openai_client.py
│ │ └── llm_client.py
│ ├── security
│ │ ├── __init__.py
│ │ ├── nuclei_scanner.py
│ │ ├── semgrep_scanner.py
│ │ ├── utils.py
│ │ └── zap_scanner.py
│ └── utils
│ ├── __init__.py
│ ├── image_utils.py
│ └── utils.py
└── test_schema.md
```
# Files
--------------------------------------------------------------------------------
/src/agents/recorder_agent.py:
--------------------------------------------------------------------------------
```python
1 | # /src/recorder_agent.py
2 | import json
3 | from importlib import resources
4 | import logging
5 | import time
6 | import re
7 | from patchright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError
8 | from typing import Dict, Any, Optional, List, Tuple, Union, Literal
9 | import random
10 | import os
11 | import threading # For timer
12 | from datetime import datetime, timezone
13 | from pydantic import BaseModel, Field
14 | from PIL import Image
15 | import io
16 |
17 | # Use relative imports within the package
18 | from ..browser.browser_controller import BrowserController
19 | from ..browser.panel.panel import Panel
20 | from ..llm.llm_client import LLMClient
21 | from ..core.task_manager import TaskManager
22 | from ..dom.views import DOMState, DOMElementNode, SelectorMap # Import DOM types
23 | # Configure logger
24 | logger = logging.getLogger(__name__)
25 |
26 | # --- Recorder Settings ---
27 | INTERACTIVE_TIMEOUT_SECS = 0 # Time for user to override AI suggestion
28 | DEFAULT_WAIT_AFTER_ACTION = 0.5 # Default small wait added after recorded actions
29 | # --- End Recorder Settings ---
30 |
31 | class PlanSubtasksSchema(BaseModel):
32 | """Schema for the planned subtasks list."""
33 | planned_steps: List[str] = Field(..., description="List of planned test step descriptions as strings.")
34 |
35 | class LLMVerificationParamsSchema(BaseModel):
36 | """Schema for parameters within a successful verification."""
37 | expected_text: Optional[str] = Field(None, description="Expected text for equals/contains assertions.")
38 | attribute_name: Optional[str] = Field(None, description="Attribute name for attribute_equals assertion.")
39 | expected_value: Optional[str] = Field(None, description="Expected value for attribute_equals assertion.")
40 | expected_count: Optional[int] = Field(None, description="Expected count for element_count assertion.")
41 |
42 | class LLMVerificationSchema(BaseModel):
43 | """Schema for the result of an LLM verification step."""
44 | verified: bool = Field(..., description="True if the condition is met, False otherwise.")
45 | assertion_type: Optional[Literal[
46 | 'assert_text_equals',
47 | 'assert_text_contains',
48 | 'assert_visible',
49 | 'assert_llm_verification',
50 | 'assert_hidden',
51 | 'assert_attribute_equals',
52 | 'assert_element_count',
53 | 'assert_checked',
54 | 'assert_enabled',
55 | 'assert_disabled',
56 | 'assert_not_checked'
57 | ]] = Field(None, description="Required if verified=true. Type of assertion suggested, reflecting the *actual observed state*.")
58 | element_index: Optional[int] = Field(None, description="Index of the *interactive* element [index] from context that might *also* relate to the verification (e.g., the button just clicked), if applicable. Set to null if verification relies solely on a static element or non-indexed element.")
59 | verification_selector: Optional[str] = Field(None, description="Final CSS selector for the verifying element (generated by the system based on index or static_id). LLM should output null for this field.")
60 | verification_static_id: Optional[str] = Field(None, description="Temporary ID (e.g., 's12') of the static element from context that confirms the verification, if applicable. Use this *instead* of verification_selector for static elements.")
61 | parameters: Optional[LLMVerificationParamsSchema] = Field(default_factory=dict, description="Parameters for the assertion based on the *actual observed state*. Required if assertion type needs params (e.g., assert_text_equals).")
62 | reasoning: str = Field(..., description="Explanation for the verification result, explaining how the intent is met or why it failed. If verified=true, justify the chosen selector and parameters.")
63 |
64 |
65 | class ReplanSchema(BaseModel):
66 | """Schema for recovery steps or abort action during re-planning."""
67 | recovery_steps: Optional[List[str]] = Field(None, description="List of recovery step descriptions (1-3 steps), if recovery is possible.")
68 | action: Optional[Literal["abort"]] = Field(None, description="Set to 'abort' if recovery is not possible/safe.")
69 | reasoning: Optional[str] = Field(None, description="Reasoning, especially required if action is 'abort'.")
70 |
71 | class RecorderSuggestionParamsSchema(BaseModel):
72 | """Schema for parameters within a recorder action suggestion."""
73 | index: Optional[int] = Field(None, description="Index of the target element from context (required for click/type/check/uncheck/key_press/drag_and_drop source).")
74 | keys: Optional[str] = Field(None, description="Key(s) to press (required for key_press action). E.g., 'Enter', 'Control+A'.")
75 | destination_index: Optional[int] = Field(None, description="Index of the target element for drag_and_drop action.")
76 | option_label: Optional[str] = Field(None, description="Visible text/label of the option to select (**required for select action**)")
77 | text: Optional[str] = Field(None, description="Text to type (required for type action).")
78 |
79 | class RecorderSuggestionSchema(BaseModel):
80 | """Schema for the AI's suggestion for a click/type action during recording."""
81 | action: Literal["click", "type", "select", "check", "uncheck", "key_press", "drag_and_drop", "action_not_applicable", "suggestion_failed"] = Field(..., description="The suggested browser action or status.")
82 | parameters: RecorderSuggestionParamsSchema = Field(default_factory=dict, description="Parameters for the action (index, text, option_label).")
83 | reasoning: str = Field(..., description="Explanation for the suggestion.")
84 |
85 | class AssertionTargetIndexSchema(BaseModel):
86 | """Schema for identifying the target element index for a manual assertion."""
87 | index: Optional[int] = Field(None, description="Index of the most relevant element from context, or null if none found/identifiable.")
88 | reasoning: Optional[str] = Field(None, description="Reasoning, especially if index is null.")
89 |
90 |
91 | class WebAgent:
92 | """
93 | Orchestrates AI-assisted web test recording, generating reproducible test scripts.
94 | Can also function in a (now legacy) execution mode.
95 | """
96 |
97 | def __init__(self,
98 | llm_client: LLMClient,
99 | headless: bool = True, # Note: Recorder mode forces non-headless
100 | max_iterations: int = 50, # Max planned steps to process in recorder
101 | max_history_length: int = 10,
102 | max_retries_per_subtask: int = 1, # Retries for *AI suggestion* or failed *execution* during recording
103 | max_extracted_data_history: int = 7, # Less relevant for recorder? Keep for now.
104 | is_recorder_mode: bool = False,
105 | automated_mode: bool = False,
106 | filename: str = "",
107 | baseline_dir: str = "./visual_baselines"):
108 |
109 | self.llm_client = llm_client
110 | self.is_recorder_mode = is_recorder_mode
111 | self.baseline_dir = os.path.abspath(baseline_dir) # Store baseline dir
112 | # Ensure baseline dir exists during initialization
113 | try:
114 | os.makedirs(self.baseline_dir, exist_ok=True)
115 | logger.info(f"Visual baseline directory set to: {self.baseline_dir}")
116 | except OSError as e:
117 | logger.warning(f"Could not create baseline directory '{self.baseline_dir}': {e}. Baseline saving might fail.")
118 |
119 | # Determine effective headless: Recorder forces non-headless unless automated
120 | effective_headless = headless
121 | if self.is_recorder_mode and not automated_mode:
122 | effective_headless = False # Interactive recording needs visible browser
123 | if headless:
124 | logger.warning("Interactive Recorder mode initiated, but headless=True was requested. Forcing headless=False.")
125 | elif automated_mode and not headless:
126 | logger.info("Automated mode running with visible browser (headless=False).")
127 |
128 | self.browser_controller = BrowserController(headless=effective_headless, auth_state_path='_'.join([filename, "auth_state.json"]))
129 | self.panel = Panel()
130 | # TaskManager manages the *planned* steps generated by LLM initially
131 | self.task_manager = TaskManager(max_retries_per_subtask=max_retries_per_subtask)
132 | self.history: List[Dict[str, Any]] = []
133 | self.extracted_data_history: List[Dict[str, Any]] = [] # Keep for potential context, but less critical now
134 | self.max_iterations = max_iterations # Limit for planned steps processing
135 | self.max_history_length = max_history_length
136 | self.max_extracted_data_history = max_extracted_data_history
137 | self.output_file_path: Optional[str] = None # Path for the recorded JSON
138 | self.file_name = filename
139 | self.feature_description: Optional[str] = None
140 | self._latest_dom_state: Optional[DOMState] = None
141 | self._consecutive_suggestion_failures = 0 # Track failures for the *same* step index
142 | self._last_failed_step_index = -1 # Track which step index had the last failure
143 | self._last_static_id_map: Dict[str, 'DOMElementNode'] = {}
144 | # --- Recorder Specific State ---
145 | self.recorded_steps: List[Dict[str, Any]] = []
146 | self._current_step_id = 1 # Counter for recorded steps
147 | self._user_abort_recording = False
148 | # --- End Recorder Specific State ---
149 | self.automated_mode = automated_mode
150 |
151 | # Log effective mode
152 | automation_status = "Automated" if self.automated_mode else "Interactive"
153 | logger.info(f"WebAgent (Recorder Mode / {automation_status}) initialized (headless={effective_headless}, max_planned_steps={max_iterations}, max_hist={max_history_length}, max_retries={max_retries_per_subtask}).")
154 | logger.info(f"Visual baseline directory: {self.baseline_dir}")
155 |
156 |
157 | def _add_to_history(self, entry_type: str, data: Any):
158 | """Adds an entry to the agent's history, maintaining max length."""
159 | timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
160 | log_data_str = "..."
161 | try:
162 | # Basic sanitization (same as before)
163 | if isinstance(data, dict):
164 | log_data = {k: (str(v)[:200] + '...' if len(str(v)) > 200 else v)
165 | for k, v in data.items()}
166 | elif isinstance(data, (str, bytes)):
167 | log_data = str(data[:297]) + "..." if len(data) > 300 else str(data)
168 | else:
169 | log_data = data
170 | log_data_str = str(log_data)
171 | if len(log_data_str) > 300: log_data_str = log_data_str[:297]+"..."
172 | except Exception as e:
173 | logger.warning(f"Error sanitizing history data: {e}")
174 | log_data = f"Error processing data: {e}"
175 | log_data_str = log_data
176 |
177 | entry = {"timestamp": timestamp, "type": entry_type, "data": log_data}
178 | self.history.append(entry)
179 | if len(self.history) > self.max_history_length:
180 | self.history.pop(0)
181 | logger.debug(f"[HISTORY] Add: {entry_type} - {log_data_str}")
182 |
183 | def _get_history_summary(self) -> str:
184 | """Provides a concise summary of the recent history for the LLM."""
185 | if not self.history: return "No history yet."
186 | summary = "Recent History (Oldest First):\n"
187 | for entry in self.history:
188 | entry_data_str = str(entry['data'])
189 | if len(entry_data_str) > 300: entry_data_str = entry_data_str[:297] + "..."
190 | summary += f"- [{entry['type']}] {entry_data_str}\n"
191 | return summary.strip()
192 |
193 | def _clean_llm_response_to_json(self, llm_output: str) -> Optional[Dict[str, Any]]:
194 | """Attempts to extract and parse JSON from the LLM's output."""
195 | logger.debug(f"[LLM PARSE] Attempting to parse LLM response (length: {len(llm_output)}).")
196 | match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", llm_output, re.DOTALL | re.IGNORECASE)
197 | if match:
198 | json_str = match.group(1).strip()
199 | logger.debug(f"[LLM PARSE] Extracted JSON from markdown block.")
200 | else:
201 | start_index = llm_output.find('{')
202 | end_index = llm_output.rfind('}')
203 | if start_index != -1 and end_index != -1 and end_index > start_index:
204 | json_str = llm_output[start_index : end_index + 1].strip()
205 | logger.debug(f"[LLM PARSE] Attempting to parse extracted JSON between {{ and }}.")
206 | else:
207 | logger.warning("[LLM PARSE] Could not find JSON structure in LLM output.")
208 | self._add_to_history("LLM Parse Error", {"reason": "No JSON structure found", "raw_output_snippet": llm_output[:200]})
209 | return None
210 |
211 | # Pre-processing (same as before)
212 | try:
213 | def escape_quotes_replacer(match):
214 | key_part, colon_part, open_quote, value, close_quote = match.groups()
215 | escaped_value = re.sub(r'(?<!\\)"', r'\\"', value)
216 | return f'{key_part}{colon_part}{open_quote}{escaped_value}{close_quote}'
217 | keys_to_escape = ["selector", "text", "reasoning", "url", "result", "answer", "reason", "file_path", "expected_text", "attribute_name", "expected_value"]
218 | pattern_str = r'(\"(?:' + '|'.join(keys_to_escape) + r')\")(\s*:\s*)(\")(.*?)(\")'
219 | pattern = re.compile(pattern_str, re.DOTALL)
220 | json_str = pattern.sub(escape_quotes_replacer, json_str)
221 | json_str = json_str.replace('\\\\n', '\\n').replace('\\n', '\n')
222 | json_str = json_str.replace('\\\\"', '\\"')
223 | json_str = json_str.replace('\\\\t', '\\t')
224 | json_str = re.sub(r',\s*([\}\]])', r'\1', json_str)
225 | except Exception as clean_e:
226 | logger.warning(f"[LLM PARSE] Error during pre-parsing cleaning: {clean_e}")
227 |
228 | # Attempt Parsing (check for 'action' primarily, parameters might be optional for some recorder actions)
229 | try:
230 | parsed_json = json.loads(json_str)
231 | if isinstance(parsed_json, dict) and "action" in parsed_json:
232 | # Parameters might not always be present (e.g., simple scroll)
233 | if "parameters" not in parsed_json:
234 | parsed_json["parameters"] = {} # Ensure parameters key exists
235 | logger.debug(f"[LLM PARSE] Successfully parsed action JSON: {parsed_json}")
236 | return parsed_json
237 | else:
238 | logger.warning(f"[LLM PARSE] Parsed JSON missing 'action' key or is not a dict: {parsed_json}")
239 | self._add_to_history("LLM Parse Error", {"reason": "Missing 'action' key", "parsed_json": parsed_json, "cleaned_json_string": json_str[:200]})
240 | return None
241 | except json.JSONDecodeError as e:
242 | logger.error(f"[LLM PARSE] Failed to decode JSON from LLM output: {e}")
243 | logger.error(f"[LLM PARSE] Faulty JSON string snippet (around pos {e.pos}): {json_str[max(0, e.pos-50):e.pos+50]}")
244 | self._add_to_history("LLM Parse Error", {"reason": f"JSONDecodeError: {e}", "error_pos": e.pos, "json_string_snippet": json_str[max(0, e.pos-50):e.pos+50]})
245 | return None
246 | except Exception as e:
247 | logger.error(f"[LLM PARSE] Unexpected error during final JSON parsing: {e}", exc_info=True)
248 | return None
249 |
250 | def _plan_subtasks(self, feature_description: str):
251 | """Uses the LLM to break down the feature test into planned steps using generate_json."""
252 | logger.info(f"Planning test steps for feature: '{feature_description}'")
253 | self.feature_description = feature_description
254 |
255 | # --- Prompt Construction (Adjusted for generate_json) ---
256 | prompt = f"""
257 | You are an AI Test Engineer planning steps for recording. Given the feature description: "{feature_description}"
258 |
259 | Break this down into a sequence of specific browser actions or verification checks.
260 | Each step should be a single instruction (e.g., "Navigate to...", "Click the 'Submit' button", "Type 'testuser' into username field", "Verify text 'Success' is visible").
261 | The recorder agent will handle identifying elements and generating selectors based on these descriptions.
262 |
263 | **Key Types of Steps to Plan:**
264 | 1. **Navigation:** `Navigate to https://example.com/login`
265 | 2. **Action:** `Click element 'Submit Button'` or `Type 'testuser' into element 'Username Input'` or `Check 'male' radio button or Check 'Agree to terms & conditions'` or `Uncheck the 'subscribe to newsletter' checkbox` (Describe the element clearly). **IMPORTANT for Dropdowns (<select>):** If the task involves selecting an option (e.g., "Select 'Canada' from the 'Country' dropdown"), generate a **SINGLE step** like: `Select option 'Canada' in element 'Country Dropdown'` (Describe the main `<select>` element and the option's visible text/label).
266 | 3. **Key Press:** `Press 'Enter' key on element 'Search Input'`, `Press 'Tab' key`. (Specify the element if the press is targeted, otherwise it might be global).
267 | 4. **Drag and Drop:** `Drag element 'Item A' onto element 'Cart Area'`. (Clearly describe source and target).
268 | 5. **Wait:** `Wait for 5 seconds`
269 | 6. **Verification:** Phrase as a check. The recorder will prompt for specifics.
270 | - `Verify 'Login Successful' message is present`
271 | - `Verify 'Cart Count' shows 1`
272 | - `Verify 'Submit' button is disabled`
273 | - **GOOD:** `Verify login success indicator is visible` (More general)
274 | - **AVOID:** `Verify text 'Welcome John Doe!' is visible` (Too specific if name changes)
275 | 7. **Scrolling:** `Scroll down` (if content might be off-screen)
276 | 8. **Visual Baseline Capture:** If the feature description implies capturing a visual snapshot at a key state (e.g., after login, after adding to cart), use: `Visually baseline the [short description of state]`. Examples: `Visually baseline the login page`, `Visually baseline the dashboard after login`, `Visually baseline the product details page`.
277 |
278 | **CRITICAL:** Focus on the *intent* of each step. Do NOT include specific selectors or indices in the plan. The recorder determines those interactively.
279 |
280 | **Output Format:** Respond with a JSON object conforming to the following structure:
281 | {{
282 | "planned_steps": ["Step 1 description", "Step 2 description", ...]
283 | }}
284 |
285 | Example Test Case: "Test login on example.com with user 'tester' and pass 'pwd123', then verify the welcome message 'Welcome, tester!' is shown."
286 | Example JSON Output Structure:
287 | {{
288 | "planned_steps": [
289 | "Navigate to https://example.com/login",
290 | "Type 'tester' into element 'username input field'",
291 | "Type 'pwd123' into element 'password input field'",
292 | "Click element 'login button'",
293 | "Verify 'Welcome, tester!' message is present"
294 | "Visually baseline the user dashboard"
295 | ]
296 | }}
297 |
298 | Now, generate the JSON object containing the planned steps for: "{feature_description}"
299 | """
300 | # --- End Prompt ---
301 |
302 | logger.debug(f"[TEST PLAN] Sending Planning Prompt (snippet):\n{prompt[:500]}...")
303 |
304 | response_obj = self.llm_client.generate_json(PlanSubtasksSchema, prompt)
305 |
306 | subtasks = None
307 | raw_response_for_history = "N/A (Used generate_json)"
308 |
309 | if isinstance(response_obj, PlanSubtasksSchema):
310 | logger.debug(f"[TEST PLAN] LLM JSON response parsed successfully: {response_obj}")
311 | # Validate the parsed list
312 | if isinstance(response_obj.planned_steps, list) and all(isinstance(s, str) and s for s in response_obj.planned_steps):
313 | subtasks = response_obj.planned_steps
314 | logger.info(f"Subtasks: {subtasks}")
315 | else:
316 | logger.warning(f"[TEST PLAN] Parsed JSON planned_steps is not a list of non-empty strings: {response_obj.planned_steps}")
317 | raw_response_for_history = f"Parsed object invalid content: {response_obj}" # Log the invalid object
318 | elif isinstance(response_obj, str): # Handle error string from generate_json
319 | logger.error(f"[TEST PLAN] Failed to generate/parse planned steps JSON from LLM: {response_obj}")
320 | raw_response_for_history = response_obj[:500]+"..."
321 | else: # Handle unexpected return type
322 | logger.error(f"[TEST PLAN] Unexpected response type from generate_json: {type(response_obj)}")
323 | raw_response_for_history = f"Unexpected type: {type(response_obj)}"
324 |
325 | # --- Update Task Manager ---
326 | if subtasks and len(subtasks) > 0:
327 | self.task_manager.add_subtasks(subtasks) # TaskManager stores the *planned* steps
328 | self._add_to_history("Test Plan Created", {"feature": feature_description, "steps": subtasks})
329 | logger.info(f"Successfully planned {len(subtasks)} test steps.")
330 | logger.debug(f"[TEST PLAN] Planned Steps: {subtasks}")
331 | else:
332 | logger.error("[TEST PLAN] Failed to generate or parse valid planned steps from LLM response.")
333 | # Use the captured raw_response_for_history which contains error details
334 | self._add_to_history("Test Plan Failed", {"feature": feature_description, "raw_response": raw_response_for_history})
335 | raise ValueError("Failed to generate a valid test plan from the feature description.")
336 |
337 | def _save_visual_baseline(self, baseline_id: str, screenshot_bytes: bytes, selector: Optional[str] = None) -> bool:
338 | """Saves the screenshot and metadata for a visual baseline."""
339 | if not screenshot_bytes:
340 | logger.error(f"Cannot save baseline '{baseline_id}', no screenshot bytes provided.")
341 | return False
342 |
343 | image_path = os.path.join(self.baseline_dir, f"{baseline_id}.png")
344 | metadata_path = os.path.join(self.baseline_dir, f"{baseline_id}.json")
345 |
346 | # --- Prevent Overwrite (Optional - Prompt user or fail) ---
347 | if os.path.exists(image_path) or os.path.exists(metadata_path):
348 | if self.automated_mode:
349 | logger.warning(f"Baseline '{baseline_id}' already exists. Overwriting in automated mode.")
350 | # Allow overwrite in automated mode
351 | else: # Interactive mode
352 | overwrite = input(f"Baseline '{baseline_id}' already exists. Overwrite? (y/N) > ").strip().lower()
353 | if overwrite != 'y':
354 | logger.warning(f"Skipping baseline save for '{baseline_id}' - user chose not to overwrite.")
355 | return False # Indicate skipped save
356 | # --- End Overwrite Check ---
357 |
358 | try:
359 | # 1. Save Image
360 | img = Image.open(io.BytesIO(screenshot_bytes))
361 | img.save(image_path, format='PNG')
362 | logger.info(f"Saved baseline image to: {image_path}")
363 |
364 | # 2. Gather Metadata
365 | current_url = self.browser_controller.get_current_url()
366 | viewport_size = self.browser_controller.get_viewport_size()
367 | browser_info = self.browser_controller.get_browser_version()
368 | os_info = self.browser_controller.get_os_info()
369 | timestamp = datetime.now(timezone.utc).isoformat()
370 |
371 | metadata = {
372 | "baseline_id": baseline_id,
373 | "image_file": os.path.basename(image_path), # Store relative path
374 | "created_at": timestamp,
375 | "updated_at": timestamp, # Same initially
376 | "url_captured": current_url,
377 | "viewport_size": viewport_size,
378 | "browser_info": browser_info,
379 | "os_info": os_info,
380 | "selector_captured": selector # Store selector if it was an element capture
381 | }
382 |
383 | # 3. Save Metadata
384 | with open(metadata_path, 'w', encoding='utf-8') as f:
385 | json.dump(metadata, f, indent=2, ensure_ascii=False)
386 | logger.info(f"Saved baseline metadata to: {metadata_path}")
387 |
388 | return True # Success
389 |
390 | except Exception as e:
391 | logger.error(f"Error saving baseline '{baseline_id}' (Image: {image_path}, Meta: {metadata_path}): {e}", exc_info=True)
392 | # Clean up potentially partially saved files
393 | if os.path.exists(image_path): os.remove(image_path)
394 | if os.path.exists(metadata_path): os.remove(metadata_path)
395 | return False # Failure
396 |
397 |
398 |
399 | def _get_llm_verification(self,
400 | verification_description: str,
401 | current_url: str,
402 | dom_context_str: str,
403 | static_id_map: Dict[str, Any],
404 | screenshot_bytes: Optional[bytes] = None,
405 | previous_error: Optional[str] = None
406 | ) -> Optional[Dict[str, Any]]:
407 | """
408 | Uses LLM's generate_json (potentially multimodal) to verify if a condition is met.
409 | Returns a dictionary representation of the result or None on error.
410 | """
411 |
412 | logger.info(f"Requesting LLM verification (using generate_json) for: '{verification_description}'")
413 | logger.info(f"""dom_context_str:
414 | {dom_context_str}
415 | """)
416 | # --- Prompt Adjustment for generate_json ---
417 | prompt = f"""
418 | You are an AI Test Verification Assistant. Your task is to determine if a specific condition, **or its clear intent**, is met based on the current web page state and If the goal IS met, propose the **most robust and specific, deterministic assertion** possible to confirm this state.
419 |
420 | **Overall Goal:** {self.feature_description}
421 | **Verification Step:** {verification_description}
422 | **Current URL:** {current_url}
423 |
424 | {f"\n**Previous Attempt Feedback:**\nA previous verification attempt for this step resulted in an error: {previous_error}\nPlease carefully re-evaluate the current state and suggest a correct and verifiable assertion, or indicate if the verification still fails.\n" if previous_error else ""}
425 |
426 | **Input Context (Visible Elements with Indices for Interactive ones):**
427 | This section shows visible elements on the page.
428 | - Interactive elements are marked with `[index]` (e.g., `[5]<button>Submit</button>`).
429 | - Static elements crucial for context are marked with `(Static)` (e.g., `<p (Static)>Login Successful!</p>`).
430 | - Some plain static elements may include a hint about their parent, like `(inside: <div id="summary">)`, to help locate them.
431 | - Some may be not visible but are interactive, assertable
432 | ```html
433 | {dom_context_str}
434 | ```
435 | {f"**Screenshot Analysis:** Please analyze the attached screenshot for visual confirmation or contradiction of the verification step." if screenshot_bytes else "**Note:** No screenshot provided for visual analysis."}
436 |
437 | **Your Task:**
438 | 1. Analyze the provided context (DOM, URL, and screenshot if provided).
439 | 2. Determine if the **intent** behind the "Verification Step" is currently TRUE or FALSE.
440 | * Example: If the step is "Verify 'Login Complete'", but the page shows "Welcome, User!", the *intent* IS met.
441 | 3. Respond with a JSON object matching the required schema.
442 | * Set the `verified` field (boolean).
443 | * Provide detailed `reasoning` (string), explaining *how* the intent is met or why it failed.
444 | * **If `verified` is TRUE:**
445 | * Identify the **single most relevant element** (interactive OR static) in the context that **specifically confirms the successful outcome** of the preceding action(s).
446 | * If confirmed by an **interactive element `[index]`**: Set `element_index` to that index. `verification_static_id` should be null.
447 | * If confirmed by a **static element shown with `data-static-id="sXXX"`**: Set `verification_static_id` to that ID string (e.g., "s15"). `element_index` should be null.
448 | * **`verification_selector` (Set to null by you):** The system will generate the final CSS selector based on the provided index or static ID. Leave this field as `null`.
449 | * **`assertion_type` (Required):** Propose a specific, deterministic assertion. Determine the most appropriate assertion type based on the **actual observed state** and the verification intent. **Prefer `assert_text_contains` for text verification unless the step demands an *exact* match.**
450 | * Use `assert_llm_verification` for cases where vision LLMs will be better than any other selector for this problem. Visual UI stuff like overflow, truncation, overlap, positioning all this can't be determined via normal playwright automation. You need llm vision verification for such stuff.
451 | * Use `assert_text_equals` / `assert_text_contains` for text content. Prefer this over visible unless text doesn't confirm the verification. Note to use the **EXACT TEXT INCLUDING ANY PUNCTUATION**
452 | * Use `assert_checked` if the intent is to verify a checkbox or radio button **is currently selected/checked**.
453 | * Use `assert_not_checked` if the intent is to verify it **is NOT selected/checked**.
454 | * Use `assert_visible` / `assert_hidden` for visibility states.
455 | * Use `assert_disabled` / `assert_enabled` for checking disabled states
456 | * Use `assert_attribute_equals` ONLY for comparing the *string value* of an attribute (e.g., `class="active"`, `value="Completed"`). **DO NOT use it for boolean attributes like `checked`, `disabled`, `selected`. Use state assertions instead.**
457 | * Use `assert_element_count` for counting elements matching a selector. **Note that you can't count child elements by using static id of parent elements. You need to use the selector for the elements you need to count**
458 | * **`parameters` (Optional):** Provide necessary parameters ONLY if the chosen `assertion_type` requires them (e.g., `assert_text_equals` needs `expected_text`). For `assert_checked`, `assert_not_checked`, `assert_visible`, `assert_hidden`, `assert_disabled`, `assert_enabled`, parameters should generally be empty (`{{}}`) or omitted. Ensure parameters reflect the *actual observed state* (e.g., observed text).
459 | * **Guidance for Robustness:**
460 | * **Prefer specific checks:** `assert_text_contains` on a specific message is better than just `assert_visible` on a generic container *if* the text confirms the verification goal.
461 | * **Dynamic Content:** For content that loads (e.g., images replacing placeholders, data appearing), prefer assertions on **stable containers** (`assert_element_count`, `assert_visible` on the container) or the **final loaded state** if reliably identifiable, rather than the transient loading state (like placeholder text).
462 | * **Element Targeting:** Identify the **single most relevant element** (interactive `[index]` OR static `data-static-id="sXXX"`) that **proves** the assertion. Set `element_index` OR `verification_static_id` accordingly.
463 | * **`verification_selector` (Set to null):** The system generates the final selector. Leave this null.
464 | * **`parameters` (Required if needed):** Provide necessary parameters based on the chosen `assertion_type` and the **actual observed state** (e.g., the exact text seen for `assert_text_contains`). Empty `{{}}` if no parameters needed (e.g., `assert_visible`).
465 | * **If `verified` is FALSE:**
466 | * `assertion_type`, `element_index`, `verification_selector`, `parameters` should typically be null/omitted.
467 |
468 | **JSON Output Structure Examples:**
469 |
470 | *Success Case (Static Element Confirms):*
471 | ```json
472 | {{
473 | "verified": true,
474 | "assertion_type": "assert_text_contains", // Preferred over just visible
475 | "element_index": null,
476 | "verification_static_id": "s23", // ID from context
477 | "verification_selector": null,
478 | "parameters": {{ "expected_text": "logged in!" }}, // Actual text observed
479 | "reasoning": "The static element <p data-static-id='s23'> shows 'logged in!', fulfilling the verification step's goal."
480 | }}
481 | ```json
482 | {{
483 | "verified": true,
484 | "assertion_type": "assert_element_count", // Verifying 5 items loaded
485 | "element_index": null, // Index not needed if counting multiple matches
486 | "verification_static_id": "s50", // Target the container element
487 | "verification_selector": null, // System will generate selector for container s50
488 | "parameters": {{ "expected_count": 5 }},
489 | "reasoning": "The list container <ul data-static-id='s50'> visually contains 5 items, confirming the verification requirement."
490 | }}
491 | ```
492 | ```json
493 | {{
494 | "verified": true,
495 | "assertion_type": "assert_llm_verification",
496 | "element_index": null,
497 | "verification_static_id": null,
498 | "verification_selector": null,
499 | "parameters": {{ }},
500 | "reasoning": "Checking if text is overflowing is best suited for a vision assisted llm verfication"
501 | }}
502 | ```
503 | ```
504 | *Success Case (Radio Button Checked):*
505 | ```json
506 | {{
507 | "verified": true,
508 | "assertion_type": "assert_checked",
509 | "element_index": 9, // <-- LLM provides interactive index
510 | "verification_static_id": null,
511 | "verification_selector": null, // <-- LLM sets to null
512 | "parameters": {{}},
513 | "reasoning": "The 'Credit Card' radio button [9] is selected on the page, fulfilling the verification requirement."
514 | }}
515 |
516 | ```
517 | *Success Case (Checkbox Not Checked - Interactive):*
518 | ```json
519 | {{
520 | "verified": true,
521 | "assertion_type": "assert_not_checked",
522 | "element_index": 11, // <-- LLM provides interactive index
523 | "verification_static_id": null,
524 | "verification_selector": null, // <-- LLM sets to null
525 | "parameters": {{}},
526 | "reasoning": "The 'Subscribe' checkbox [11] is not checked, as required by the verification step."
527 | }}
528 | ```
529 | *Success Case (Interactive Element Confirms - Visible):*
530 | ```json
531 | {{
532 | "verified": true,
533 | "assertion_type": "assert_visible",
534 | "element_index": 8, // <-- LLM provides interactive index
535 | "verification_static_id": null,
536 | "verification_selector": null, // <-- LLM sets to null
537 | "parameters": {{}},
538 | "reasoning": "Element [8] (logout button) is visible, confirming the user is logged in as per the verification step intent."
539 | }}
540 | ```
541 | *Success Case (Attribute on Static Element):*
542 | ```json
543 | {{
544 | "verified": true,
545 | "assertion_type": "assert_attribute_equals",
546 | "element_index": null,
547 | "verification_static_id": "s45", // <-- LLM provides static ID
548 | "verification_selector": null, // <-- LLM sets to null
549 | "parameters": {{ "attribute_name": "data-status", "expected_value": "active" }},
550 | "reasoning": "The static <div data-static-id='s45' data-status='active' (Static)> element has the 'data-status' attribute set to 'active', confirming the verification requirement."
551 | }}
552 | ```
553 | *Failure Case:*
554 | ```json
555 | {{
556 | "verified": false,
557 | "assertion_type": null,
558 | "element_index": null,
559 | "verification_static_id": null,
560 | "verification_selector": null,
561 | "parameters": {{}},
562 | "reasoning": "Could not find the 'Success' message or any other indication of successful login in the provided context or screenshot."
563 | }}
564 | ```
565 |
566 | **CRITICAL:** If `verified` is true, provide *either* `element_index` for interactive elements OR `verification_static_id` for static elements. **Do not generate the `verification_selector` yourself; set it to `null`.** Explain any discrepancies between the plan and the observed state in `reasoning`. Respond ONLY with the JSON object matching the schema.
567 |
568 | Now, generate the verification JSON for: "{verification_description}"
569 | """
570 | if previous_error:
571 | err = f"\n**Previous Attempt Feedback:**\nA previous verification attempt for this step resulted in an error: {previous_error}\nPlease carefully re-evaluate the current state and suggest a correct and verifiable assertion, or indicate if the verification still fails.\n" if previous_error else ""
572 | logger.critical(err)
573 |
574 | # Call generate_json, passing image_bytes if available
575 | logger.debug("[LLM VERIFY] Sending prompt (and potentially image) to generate_json...")
576 | response_obj = self.llm_client.generate_json(
577 | LLMVerificationSchema,
578 | prompt,
579 | image_bytes=screenshot_bytes # Pass the image bytes here
580 | )
581 |
582 | verification_json = None # Initialize
583 | if isinstance(response_obj, LLMVerificationSchema):
584 | logger.debug(f"[LLM VERIFY] Successfully parsed response: {response_obj}")
585 | verification_dict = response_obj.model_dump(exclude_none=True)
586 |
587 |
588 | assertion_type = verification_dict.get("assertion_type")
589 | params = verification_dict.get("parameters", {})
590 | needs_params = assertion_type in ['assert_text_equals', 'assert_text_contains', 'assert_attribute_equals', 'assert_element_count']
591 | no_params_needed = assertion_type in ['assert_checked', 'assert_not_checked', 'assert_disabled', 'assert_enabled', 'assert_visible', 'assert_hidden', 'assert_llm_verification']
592 |
593 |
594 |
595 | # --- Post-hoc Validation ---
596 | is_verified_by_llm = verification_dict.get("verified")
597 | if is_verified_by_llm is None:
598 | logger.error("[LLM VERIFY FAILED] Parsed JSON missing required 'verified' field.")
599 | self._add_to_history("LLM Verification Error", {"reason": "Missing 'verified' field", "parsed_dict": verification_dict})
600 | return None
601 |
602 | if not verification_dict.get("reasoning"):
603 | logger.warning(f"[LLM VERIFY] Missing 'reasoning' in response: {verification_dict}")
604 | verification_dict["reasoning"] = "No reasoning provided by LLM."
605 |
606 |
607 | # --- Selector Generation Logic ---
608 | final_selector = None # Initialize selector to be potentially generated
609 | if is_verified_by_llm:
610 | static_id = verification_dict.get("verification_static_id")
611 | interactive_index = verification_dict.get("element_index") # Keep original name from schema
612 |
613 | if static_id:
614 | # --- Static element identified by ID ---
615 | target_node = static_id_map.get(static_id)
616 | if target_node:
617 | logger.info(f"LLM identified static element via ID: {static_id}. Generating selector...")
618 | # Import or access DomService appropriately here
619 | from ..dom.service import DomService
620 | try:
621 | # Generate selector using Python logic
622 | final_selector = DomService._enhanced_css_selector_for_element(target_node)
623 | if not final_selector:
624 | logger.error(f"Failed to generate selector for static node (ID: {static_id}). Falling back to XPath.")
625 | final_selector = f"xpath={target_node.xpath}" # Fallback
626 | verification_dict["verification_selector"] = final_selector
627 | verification_dict["_static_id_used"] = static_id
628 | verification_dict["element_index"] = None # Ensure index is None for static
629 | logger.info(f"Generated selector for static ID {static_id}: {final_selector}")
630 | except Exception as gen_err:
631 | logger.error(f"Error generating selector for static ID {static_id}: {gen_err}")
632 | # Decide how to handle: fail verification? Use XPath?
633 | verification_dict["verification_selector"] = f"xpath={target_node.xpath}" # Fallback
634 | verification_dict["element_index"] = None
635 | verification_dict["reasoning"] += f" [Selector generation failed: {gen_err}]"
636 |
637 | else:
638 | logger.error(f"LLM returned static ID '{static_id}' but it wasn't found in the context map!")
639 | # Mark as failed or handle error appropriately
640 | verification_dict["verified"] = False
641 | verification_dict["reasoning"] = f"Verification failed: LLM hallucinated static ID '{static_id}'."
642 | # Clear fields that shouldn't be present on failure
643 | verification_dict.pop("assertion_type", None)
644 | verification_dict.pop("verification_static_id", None)
645 | verification_dict.pop("parameters", None)
646 |
647 | elif interactive_index is not None:
648 | # --- Interactive element identified by index ---
649 | if self._latest_dom_state and self._latest_dom_state.selector_map:
650 | target_node = self._latest_dom_state.selector_map.get(interactive_index)
651 | if target_node and target_node.css_selector:
652 | final_selector = target_node.css_selector
653 | verification_dict["verification_selector"] = final_selector
654 | # verification_dict["element_index"] is already set
655 | logger.info(f"Using pre-generated selector for interactive index {interactive_index}: {final_selector}")
656 | else:
657 | logger.error(f"LLM returned interactive index [{interactive_index}] but node or selector not found in map!")
658 | verification_dict["verified"] = False
659 | verification_dict["reasoning"] = f"Verification failed: LLM hallucinated interactive index '{interactive_index}' or selector missing."
660 | verification_dict.pop("assertion_type", None)
661 | verification_dict.pop("element_index", None)
662 | verification_dict.pop("parameters", None)
663 | else:
664 | logger.error("Cannot resolve interactive index: DOM state or selector map missing.")
665 | verification_dict["verified"] = False
666 | verification_dict["reasoning"] = "Verification failed: Internal error retrieving DOM state for interactive index."
667 | verification_dict.pop("assertion_type", None)
668 | verification_dict.pop("element_index", None)
669 | verification_dict.pop("parameters", None)
670 |
671 | else:
672 | # Verified = true, but LLM provided neither static ID nor interactive index
673 | logger.error("LLM verification PASSED but provided neither static ID nor interactive index!")
674 | # verification_dict["verified"] = False
675 | verification_dict["reasoning"] = "Verified to be true by using vision LLMs"
676 | verification_dict.pop("assertion_type", None)
677 | verification_dict.pop("parameters", None)
678 |
679 | if final_selector and target_node and self.browser_controller.page:
680 | try:
681 | handles = self.browser_controller.page.query_selector_all(final_selector)
682 | num_matches = len(handles)
683 | validation_passed = False
684 |
685 | if num_matches == 1:
686 | # Get XPath of the element found by the generated selector
687 |
688 | try:
689 | with resources.files(__package__).joinpath('js_utils', 'xpathgenerator.js') as js_path:
690 | js_code = js_path.read_text(encoding='utf-8')
691 | logger.debug("xpathgenerator.js loaded successfully.")
692 | except FileNotFoundError:
693 | logger.error("xpathgenerator.js not found in the 'agents' package directory!")
694 | raise
695 | except Exception as e:
696 | logger.error(f"Error loading xpathgenerator.js: {e}", exc_info=True)
697 | raise
698 |
699 | # Pass a *function string* to evaluate. Playwright passes the handle as 'element'.
700 | # The function string first defines our helper, then calls it.
701 | script_to_run = f"""
702 | (element) => {{
703 | {js_code} // Define the helper function(s)
704 | return generateXPathForElement(element); // Call it
705 | }}
706 | """
707 |
708 | # Use page.evaluate, passing the script string and the element handle
709 | matched_xpath = self.browser_controller.page.evaluate(script_to_run, handles[0])
710 |
711 | # Compare XPaths (simplest reliable comparison)
712 | if target_node.xpath == matched_xpath:
713 | validation_passed = True
714 | logger.info(f"Validation PASSED: Selector '{final_selector}' uniquely matches target node.")
715 | else:
716 | logger.warning(f"Validation FAILED: Selector '{final_selector}' matched 1 element, but its XPath ('{matched_xpath}') differs from target node XPath ('{target_node.xpath}').")
717 | elif num_matches == 0:
718 | logger.warning(f"Validation FAILED: Selector '{final_selector}' matched 0 elements.")
719 | else: # num_matches > 1
720 | logger.warning(f"Validation FAILED: Selector '{final_selector}' matched {num_matches} elements (not unique).")
721 |
722 | # --- Fallback to XPath if validation failed ---
723 | if not validation_passed:
724 | logger.warning(f"Falling back to XPath selector for target node.")
725 | original_selector = final_selector # Keep for logging/reasoning
726 | final_selector = f"xpath={target_node.xpath}"
727 | # Update reasoning if possible
728 | if "reasoning" in verification_dict:
729 | verification_dict["reasoning"] += f" [Note: CSS selector ('{original_selector}') failed validation, using XPath fallback.]"
730 | # Update the selector in the dictionary being built
731 | verification_dict["verification_selector"] = final_selector
732 |
733 | except Exception as validation_err:
734 | logger.error(f"Error during selector validation ('{final_selector}'): {validation_err}. Falling back to XPath.")
735 | original_selector = final_selector
736 | final_selector = f"xpath={target_node.xpath}"
737 | if "reasoning" in verification_dict:
738 | verification_dict["reasoning"] += f" [Note: Error validating CSS selector ('{original_selector}'), using XPath fallback.]"
739 | verification_dict["verification_selector"] = final_selector
740 |
741 | # --- Post-hoc validation (same as before, applied to final verification_dict) ---
742 | if verification_dict.get("verified"):
743 | assertion_type = verification_dict.get("assertion_type")
744 | params = verification_dict.get("parameters", {})
745 | needs_params = assertion_type in ['assert_text_equals', 'assert_text_contains', 'assert_attribute_equals', 'assert_element_count']
746 | no_params_needed = assertion_type in ['assert_checked', 'assert_not_checked', 'assert_enabled', 'assert_disabled', 'assert_visible', 'assert_hidden', 'assert_llm_verification']
747 |
748 | if not verification_dict.get("verification_selector"):
749 | logger.error("Internal Error: Verification marked passed but final selector is missing!")
750 | # verification_dict["verified"] = False
751 | verification_dict["reasoning"] = "Verified to be true by using vision LLMs"
752 |
753 | elif needs_params and not params:
754 | logger.warning(f"[LLM VERIFY WARN] Verified=true and assertion '{assertion_type}' typically needs parameters, but none provided: {verification_dict}")
755 | elif no_params_needed and params:
756 | logger.warning(f"[LLM VERIFY WARN] Verified=true and assertion '{assertion_type}' typically needs no parameters, but some provided: {params}. Using empty params.")
757 | verification_dict["parameters"] = {}
758 |
759 | # Assign the potentially modified dictionary
760 | verification_json = verification_dict
761 |
762 | elif isinstance(response_obj, str): # Handle error string
763 | logger.error(f"[LLM VERIFY FAILED] LLM returned an error string: {response_obj}")
764 | self._add_to_history("LLM Verification Failed", {"raw_error_response": response_obj})
765 | return None
766 | else: # Handle unexpected type
767 | logger.error(f"[LLM VERIFY FAILED] Unexpected response type from generate_json: {type(response_obj)}")
768 | self._add_to_history("LLM Verification Failed", {"response_type": str(type(response_obj))})
769 | return None
770 |
771 |
772 | if verification_json:
773 | logger.info(f"[LLM VERIFY RESULT] Verified: {verification_json['verified']}, Selector: {verification_json.get('verification_selector')}, Reasoning: {verification_json.get('reasoning', '')[:150]}...")
774 | self._add_to_history("LLM Verification Result", verification_json)
775 | return verification_json # Return the dictionary
776 | else:
777 | logger.error("[LLM VERIFY FAILED] Reached end without valid verification_json.")
778 | return None
779 |
780 | def _handle_llm_verification(self, planned_step: Dict[str, Any], verification_result: Dict[str, Any]) -> bool:
781 | """
782 | Handles the user interaction and recording after LLM verification.
783 | Now uses verification_selector as the primary target.
784 | Returns True if handled (recorded/skipped), False if aborted.
785 | """
786 | planned_desc = planned_step["description"]
787 | is_verified_by_llm = verification_result['verified']
788 | reasoning = verification_result.get('reasoning', 'N/A')
789 | step_handled = False # Flag to track if a decision was made
790 | parameter_name = None
791 |
792 | if is_verified_by_llm:
793 | final_selector = verification_result.get("verification_selector")
794 | assertion_type = verification_result.get("assertion_type")
795 | parameters = verification_result.get("parameters", {})
796 | interactive_index = verification_result.get("element_index") # Optional index hint
797 | static_id = verification_result.get("_static_id_used") # Optional static ID hint
798 |
799 | # --- Perform Quick Validation using Playwright ---
800 | validation_passed = False
801 | validation_error = "Validation prerequisites not met (missing type/selector, unless assert_llm_verification)."
802 |
803 | # Only validate if type and selector are present (or if type is assert_llm_verification)
804 | if assertion_type and (final_selector or assertion_type == 'assert_llm_verification' or assertion_type == 'assert_passed_verification'):
805 | logger.info("Performing quick Playwright validation of LLM's suggested assertion...")
806 | validation_passed, validation_error = self.browser_controller.validate_assertion(
807 | assertion_type, final_selector, parameters
808 | )
809 | elif not assertion_type:
810 | validation_passed = True
811 | assertion_type = "assert_llm_verification"
812 | # NOTE THAT THIS IS CURRENTLY TREATED AS VERIFIED BY VISION LLM
813 | # (selector check is handled inside validate_assertion)
814 |
815 | if not validation_passed:
816 | # --- Validation FAILED ---
817 | logger.warning(f"LLM assertion validation FAILED for step '{planned_desc}': {validation_error}")
818 | error_message_for_task = f"LLM assertion validation failed: {validation_error}. Original AI reasoning: {reasoning}"
819 | # Mark task as failed so the main loop can handle retry/re-planning, passing the validation error
820 | self.task_manager.update_subtask_status(
821 | self.task_manager.current_subtask_index,
822 | "failed",
823 | error=error_message_for_task,
824 | force_update=True # Ensure status changes
825 | )
826 | # No UI shown here, let main loop retry and pass error back to _get_llm_verification
827 | return True # Indicate step was handled (by failing validation)
828 |
829 | # --- Validation PASSED ---
830 | logger.info("LLM assertion validation PASSED.")
831 | # Proceed with highlighting, panel display (interactive), or direct recording (automated)
832 |
833 | # --- Automated Mode (Validated Success) ---
834 | if self.automated_mode:
835 | logger.info(f"[Auto Mode] Handling Validated LLM Verification for: '{planned_desc}'")
836 | logger.info(f"[Auto Mode] AI Result: PASSED. Validated Assertion: {assertion_type} on {final_selector}")
837 |
838 | highlight_color = "#008000" if static_id else "#0000FF" # Green for static, Blue for interactive/direct
839 | highlight_text = "Verify Target (Static)" if static_id else "Verify Target"
840 | highlight_idx_display = 0 if static_id else (interactive_index if interactive_index is not None else 0)
841 |
842 | self.browser_controller.clear_highlights()
843 | try:
844 | # Find node XPath if possible for better highlighting fallback
845 | target_node_xpath = None
846 | target_node = None
847 | if static_id and hasattr(self, '_last_static_id_map') and self._last_static_id_map:
848 | target_node = self._last_static_id_map.get(static_id)
849 | elif interactive_index is not None and self._latest_dom_state and self._latest_dom_state.selector_map:
850 | target_node = self._latest_dom_state.selector_map.get(interactive_index)
851 |
852 | if target_node: target_node_xpath = target_node.xpath
853 |
854 | self.browser_controller.highlight_element(final_selector, highlight_idx_display, color=highlight_color, text=highlight_text, node_xpath=target_node_xpath)
855 | except Exception as hl_err:
856 | logger.warning(f"Could not highlight verification target '{final_selector}': {hl_err}")
857 | print(f"AI suggests assertion on element: {final_selector} (Highlight failed)")
858 |
859 | # Record the validated assertion automatically
860 | logger.info(f"[Auto Mode] Recording validated assertion: {assertion_type} on {final_selector}")
861 | record = {
862 | "step_id": self._current_step_id,
863 | "action": assertion_type,
864 | "description": planned_desc,
865 | "parameters": parameters,
866 | "selector": final_selector,
867 | "wait_after_secs": 0
868 | }
869 | self.recorded_steps.append(record)
870 | self._current_step_id += 1
871 | logger.info(f"Step {record['step_id']} recorded (AI Verified & Validated - Automated)")
872 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded validated AI assertion (automated) as step {record['step_id']}")
873 | step_handled = True
874 | self.browser_controller.clear_highlights() # Clear highlights even in auto mode
875 | return True # Indicate handled
876 |
877 | # --- Interactive Mode (Validated Success) ---
878 | else:
879 | print("\n" + "="*60)
880 | print(f"Planned Step: {planned_desc}")
881 | print(f"Review AI Verification Result (Validated) in Panel...")
882 | # Highlight element
883 | final_selector = verification_result.get("verification_selector") # Re-fetch just in case
884 | interactive_index = verification_result.get("element_index")
885 | static_id = verification_result.get("_static_id_used")
886 | highlight_color = "#008000" if static_id else "#0000FF" # Green for static, Blue for interactive/direct
887 | highlight_text = "Verify Target (Static)" if static_id else "Verify Target"
888 | highlight_idx_display = 0 if static_id else (interactive_index if interactive_index is not None else 0)
889 |
890 | self.browser_controller.clear_highlights()
891 | try:
892 | target_node_xpath = None
893 | target_node = None
894 | if static_id and hasattr(self, '_last_static_id_map') and self._last_static_id_map:
895 | target_node = self._last_static_id_map.get(static_id)
896 | elif interactive_index is not None and self._latest_dom_state and self._latest_dom_state.selector_map:
897 | target_node = self._latest_dom_state.selector_map.get(interactive_index)
898 |
899 | if target_node: target_node_xpath = target_node.xpath
900 |
901 | self.browser_controller.highlight_element(final_selector, highlight_idx_display, color=highlight_color, text=highlight_text, node_xpath=target_node_xpath)
902 | print(f"AI suggests assertion on element (Validation PASSED): {final_selector}")
903 | except Exception as hl_err:
904 | logger.warning(f"Could not highlight verification target '{final_selector}': {hl_err}")
905 | print(f"AI suggests assertion on element (Validation PASSED): {final_selector} (Highlight failed)")
906 |
907 | print("="*60)
908 |
909 | # Show the review panel (button should be enabled now)
910 | self.panel.show_verification_review_panel(planned_desc, verification_result)
911 |
912 | # Wait for user choice from panel
913 | user_choice = self.panel.wait_for_panel_interaction(30.0) # Give more time for review
914 | if not user_choice: user_choice = 'skip' # Default to skip on timeout
915 |
916 | # --- Process User Choice ---
917 | if user_choice == 'record_ai':
918 | # Record validated assertion
919 | final_selector = verification_result.get("verification_selector")
920 | assertion_type = verification_result.get("assertion_type")
921 | parameters = verification_result.get("parameters", {})
922 | print(f"Recording validated AI assertion: {assertion_type} on {final_selector}")
923 | record = { "step_id": self._current_step_id, "action": assertion_type, "description": planned_desc,
924 | "parameters": parameters, "selector": final_selector, "wait_after_secs": 0 }
925 | self.recorded_steps.append(record)
926 | self._current_step_id += 1
927 | logger.info(f"Step {record['step_id']} recorded (AI Verified & Validated)")
928 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded validated AI assertion as step {record['step_id']}")
929 | step_handled = True
930 |
931 | elif user_choice == 'define_manual':
932 | print("Switching to manual assertion definition...")
933 | self.panel.hide_recorder_panel() # Hide review panel
934 | # Call the existing manual handler
935 | if not self._handle_assertion_recording(planned_step):
936 | self._user_abort_recording = True # Propagate abort signal
937 | step_handled = True # Manual path handles the step
938 |
939 | elif user_choice == 'skip':
940 | print("Skipping verification step.")
941 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped AI verification via panel")
942 | step_handled = True
943 |
944 | elif user_choice == 'abort':
945 | print("Aborting recording.")
946 | self._user_abort_recording = True
947 | step_handled = False # Abort signal
948 |
949 | else: # Includes timeout, None, unexpected values
950 | print("Invalid choice or timeout during verification review. Skipping step.")
951 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Invalid choice/timeout on AI verification")
952 | step_handled = True
953 |
954 | # --- Cleanup UI ---
955 | self.browser_controller.clear_highlights()
956 | self.panel.hide_recorder_panel()
957 |
958 | return step_handled and not self._user_abort_recording
959 |
960 | else: # --- LLM verification FAILED initially ---
961 | logger.warning(f"[Verification] AI verification FAILED for '{planned_desc}'. Reasoning: {reasoning}")
962 | # Mode-dependent handling
963 | if self.automated_mode:
964 | logger.warning("[Auto Mode] Recording AI verification failure.")
965 | failed_record = {
966 | "step_id": self._current_step_id,
967 | "action": "assert_failed_verification", # Specific action type
968 | "description": planned_desc,
969 | "parameters": {"reasoning": reasoning},
970 | "selector": None, "wait_after_secs": 0
971 | }
972 | self.recorded_steps.append(failed_record)
973 | self._current_step_id += 1
974 | logger.info(f"Step {failed_record['step_id']} recorded (AI Verification FAILED - Automated)")
975 | self.task_manager.update_subtask_status(
976 | self.task_manager.current_subtask_index,
977 | "failed", # Mark as skipped
978 | result=f"AI verification failed: {reasoning}. Recorded as failed assertion.",
979 | force_update=True
980 | )
981 | step_handled = True
982 |
983 | else: # Interactive Mode - Fallback to Manual
984 | print("\n" + "="*60)
985 | print(f"Planned Step: {planned_desc}")
986 | print(f"AI Verification Result: FAILED (Reason: {reasoning})")
987 | print("Falling back to manual assertion definition...")
988 | print("="*60)
989 | self.browser_controller.clear_highlights()
990 | self.panel.hide_recorder_panel() # Ensure review panel isn't shown
991 | # Call the existing manual handler
992 | if not self._handle_assertion_recording(planned_step):
993 | self._user_abort_recording = True # Propagate abort
994 | step_handled = True # Manual path handles the step
995 |
996 | return step_handled and not self._user_abort_recording
997 |
998 |
999 |
1000 | def _trigger_re_planning(self, current_planned_task: Dict[str, Any], reason: str) -> bool:
1001 | """
1002 | Attempts to get recovery steps from the LLM (using generate_json, potentially multimodal)
1003 | when an unexpected state is detected.
1004 | Returns True if recovery steps were inserted, False otherwise (or if abort requested).
1005 | """
1006 | logger.warning(f"Triggering re-planning due to: {reason}")
1007 | self._add_to_history("Re-planning Triggered", {"reason": reason, "failed_step_desc": current_planned_task['description']})
1008 |
1009 | if self.automated_mode:
1010 | print = lambda *args, **kwargs: logger.info(f"[Auto Mode Replanning] {' '.join(map(str, args))}") # Redirect print
1011 | input = lambda *args, **kwargs: 'y' # Default to accepting suggestions in auto mode
1012 | else:
1013 | print("\n" + "*"*60)
1014 | print("! Unexpected State Detected !")
1015 | print(f"Reason: {reason}")
1016 | print(f"Original Goal: {self.feature_description}")
1017 | print(f"Attempting Step: {current_planned_task['description']}")
1018 | print("Asking AI for recovery suggestions...")
1019 | print("*"*60)
1020 |
1021 | # --- Gather Context for Re-planning ---
1022 | current_url = "Error getting URL"
1023 | dom_context_str = "Error getting DOM"
1024 | screenshot_bytes = None # Initialize
1025 | try:
1026 | current_url = self.browser_controller.get_current_url()
1027 | if self._latest_dom_state and self._latest_dom_state.element_tree:
1028 | dom_context_str, _ = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification')
1029 | screenshot_bytes = self.browser_controller.take_screenshot()
1030 | except Exception as e:
1031 | logger.error(f"Error gathering context for re-planning: {e}")
1032 |
1033 | original_plan_str = "\n".join([f"- {t['description']}" for t in self.task_manager.subtasks])
1034 | history_summary = self._get_history_summary()
1035 | last_done_step_desc = "N/A (Start of test)"
1036 | for i in range(current_planned_task['index'] -1, -1, -1):
1037 | if self.task_manager.subtasks[i]['status'] == 'done':
1038 | last_done_step_desc = self.task_manager.subtasks[i]['description']
1039 | break
1040 |
1041 | # --- Construct Re-planning Prompt (Adjusted for generate_json with image) ---
1042 | prompt = f"""
1043 | You are an AI Test Recorder Assistant helping recover from an unexpected state during test recording.
1044 |
1045 | **Overall Goal:** {self.feature_description}
1046 | **Original Planned Steps:**
1047 | {original_plan_str}
1048 |
1049 | **Current Situation:**
1050 | - Last successfully completed planned step: '{last_done_step_desc}'
1051 | - Currently trying to execute planned step: '{current_planned_task['description']}' (Attempt {current_planned_task['attempts']})
1052 | - Encountered unexpected state/error: {reason}
1053 | - Current URL: {current_url}
1054 |
1055 | **Current Page Context (Interactive Elements with Indices. You can interact with non visible elements as well):**
1056 | ```html
1057 | {dom_context_str}
1058 | ```
1059 |
1060 | {f"**Screenshot Analysis:** Please analyze the attached screenshot to understand the current visual state and identify elements relevant for recovery." if screenshot_bytes else "**Note:** No screenshot provided for visual analysis."}
1061 |
1062 |
1063 | **Your Task:**
1064 | Analyze the current situation, context (DOM, URL, screenshot if provided), and the overall goal. If you are not clear and think scrolling might help, you can scroll to see the complete page.
1065 | Generate a JSON object matching the required schema.
1066 | - **If recovery is possible:** Provide a **short sequence (1-3 steps)** of recovery actions in the `recovery_steps` field (list of strings). These steps should aim to get back on track towards the original goal OR correctly perform the intended action of the failed step ('{current_planned_task['description']}') in the *current* context. Focus ONLY on the immediate recovery. Example: `["Click element 'Close Popup Button'", "Verify 'Main Page Title' is visible"]`. `action` field should be null.
1067 | - **If recovery seems impossible, too complex, or unsafe:** Set the `action` field to `"abort"` and provide a brief explanation in the `reasoning` field. `recovery_steps` should be null. Example: `{{"action": "abort", "reasoning": "Critical error page displayed, cannot identify recovery elements."}}`
1068 |
1069 | **JSON Output Structure Examples:**
1070 | *Recovery Possible:*
1071 | ```json
1072 | {{
1073 | "recovery_steps": ["Click element 'Accept Cookies Button'", "Verify 'Main Page Title' is visible"],
1074 | "action": null,
1075 | "reasoning": null
1076 | }}
1077 | ```
1078 | *Recovery Impossible:*
1079 | ```json
1080 | {{
1081 | "recovery_steps": null,
1082 | "action": "abort",
1083 | "reasoning": "The application crashed, unable to proceed."
1084 | }}
1085 | ```
1086 | Respond ONLY with the JSON object matching the schema.
1087 | """
1088 |
1089 |
1090 | # --- Call LLM using generate_json, passing image_bytes ---
1091 | response_obj = None
1092 | error_msg = None
1093 | try:
1094 | logger.debug("[LLM REPLAN] Sending prompt (and potentially image) to generate_json...")
1095 | response_obj = self.llm_client.generate_json(
1096 | ReplanSchema,
1097 | prompt,
1098 | image_bytes=screenshot_bytes # Pass image here
1099 | )
1100 | logger.debug(f"[LLM REPLAN] Raw response object type: {type(response_obj)}")
1101 |
1102 |
1103 | except Exception as e:
1104 | logger.error(f"LLM call failed during re-planning: {e}", exc_info=True)
1105 | print("Error: Could not communicate with LLM for re-planning.")
1106 | error_msg = f"LLM communication error: {e}"
1107 |
1108 | # --- Parse Response ---
1109 | recovery_steps = None
1110 | abort_action = False
1111 | abort_reasoning = "No reason provided."
1112 |
1113 | if isinstance(response_obj, ReplanSchema):
1114 | logger.debug(f"[LLM REPLAN] Successfully parsed response: {response_obj}")
1115 | if response_obj.recovery_steps and isinstance(response_obj.recovery_steps, list):
1116 | if all(isinstance(s, str) and s for s in response_obj.recovery_steps):
1117 | recovery_steps = response_obj.recovery_steps
1118 | else:
1119 | logger.warning(f"[LLM REPLAN] Parsed recovery_steps list contains invalid items: {response_obj.recovery_steps}")
1120 | error_msg = "LLM provided invalid recovery steps."
1121 | elif response_obj.action == "abort":
1122 | abort_action = True
1123 | abort_reasoning = response_obj.reasoning or "No specific reason provided by AI."
1124 | logger.warning(f"AI recommended aborting recording. Reason: {abort_reasoning}")
1125 | else:
1126 | logger.warning("[LLM REPLAN] LLM response did not contain valid recovery steps or an abort action.")
1127 | error_msg = "LLM response was valid JSON but lacked recovery_steps or abort action."
1128 |
1129 | elif isinstance(response_obj, str): # Handle error string from generate_json
1130 | logger.error(f"[LLM REPLAN] Failed to generate/parse recovery JSON: {response_obj}")
1131 | error_msg = f"LLM generation/parsing error: {response_obj}"
1132 | elif response_obj is None and error_msg: # Handle communication error from above
1133 | pass # error_msg already set
1134 | else: # Handle unexpected return type
1135 | logger.error(f"[LLM REPLAN] Unexpected response type from generate_json: {type(response_obj)}")
1136 | error_msg = f"Unexpected response type: {type(response_obj)}"
1137 |
1138 | # --- Handle Outcome (Mode-dependent) ---
1139 | if abort_action:
1140 | print(f"\nAI Suggests Aborting: {abort_reasoning}")
1141 | if self.automated_mode:
1142 | print("[Auto Mode] Accepting AI abort suggestion.")
1143 | abort_choice = 'a'
1144 | else:
1145 | abort_choice = input("AI suggests aborting. Abort (A) or Ignore and Skip Failed Step (S)? > ").strip().lower()
1146 |
1147 | if abort_choice == 'a':
1148 | self._user_abort_recording = True # Mark for abort
1149 | self._abort_reason = abort_reasoning
1150 | self.task_manager.update_subtask_status(current_planned_task['index'], "failed", error=f"Aborted based on AI re-planning suggestion: {abort_reasoning}", force_update=True)
1151 | return False # Abort
1152 | else: # Skipped (Interactive only)
1153 | logger.info("User chose to ignore AI abort suggestion and skip the failed step.")
1154 | self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped after AI suggested abort", force_update=True)
1155 | return False # Didn't insert steps
1156 |
1157 | elif recovery_steps:
1158 | print("\nAI Suggested Recovery Steps:")
1159 | for i, step in enumerate(recovery_steps): print(f" {i+1}. {step}")
1160 |
1161 | if self.automated_mode:
1162 | print("[Auto Mode] Accepting AI recovery steps.")
1163 | confirm_recovery = 'y'
1164 | else:
1165 | confirm_recovery = input("Attempt these recovery steps? (Y/N/Abort) > ").strip().lower()
1166 |
1167 | if confirm_recovery == 'y':
1168 | logger.info(f"Attempting AI recovery steps: {recovery_steps}")
1169 | if self._insert_recovery_steps(current_planned_task['index'] + 1, recovery_steps):
1170 | # Record that the original step's intent is being handled by a re-plan
1171 | self.recorded_steps.append({
1172 | "step_id": self._current_step_id,
1173 | "action": "task_replanned", # Changed action name
1174 | "description": current_planned_task['description'], # Use original task description
1175 | "parameters": {
1176 | "reason_for_replan": reason,
1177 | "recovery_steps_planned": recovery_steps,
1178 | "original_task_index_in_plan": current_planned_task['index'] # For traceability
1179 | },
1180 | "selector": None,
1181 | "wait_after_secs": 0
1182 | })
1183 | self._current_step_id += 1
1184 |
1185 | # Mark the original task as 'done' in TaskManager as its outcome is now via these recovery steps
1186 | self.task_manager.update_subtask_status(
1187 | current_planned_task['index'],
1188 | "done",
1189 | result=f"Step handled by re-planning. Details recorded as 'task_replanned'. Recovery steps: {recovery_steps}",
1190 | force_update=True # Ensure status is updated
1191 | )
1192 |
1193 | self._consecutive_suggestion_failures = 0
1194 | return True # Indicate recovery steps were inserted
1195 | else: # Insertion failed (should be rare)
1196 | print("Error: Failed to insert recovery steps. Skipping original failed step.")
1197 | self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped (failed to insert AI recovery steps)", force_update=True)
1198 | return False
1199 | elif confirm_recovery == 'a': # Interactive only
1200 | self._user_abort_recording = True
1201 | return False # Abort
1202 | else: # N or invalid (Interactive or failed auto-acceptance)
1203 | print("Skipping recovery attempt and the original failed step.")
1204 | logger.info("User declined/skipped AI recovery steps. Skipping original failed step.")
1205 | self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped (User/Auto declined AI recovery)", force_update=True)
1206 | return False # Skipped
1207 |
1208 | else: # LLM failed to provide valid steps or abort
1209 | print(f"\nAI failed to provide valid recovery steps or an abort action. Reason: {error_msg or 'Unknown LLM issue'}")
1210 | if self.automated_mode:
1211 | print("[Auto Mode] Skipping failed step due to LLM re-planning failure.")
1212 | skip_choice = 's'
1213 | else:
1214 | skip_choice = input("Skip the current failed step (S) or Abort recording (A)? > ").strip().lower()
1215 |
1216 | if skip_choice == 'a': # Interactive only possibility
1217 | self._user_abort_recording = True
1218 | return False # Abort
1219 | else: # Skip (default for auto mode, or user choice)
1220 | print("Skipping the original failed step.")
1221 | logger.warning(f"LLM failed re-planning ({error_msg}). Skipping original failed step.")
1222 | self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result=f"Skipped (AI re-planning failed: {error_msg})", force_update=True)
1223 | return False # Skipped
1224 |
1225 |
1226 | def _insert_recovery_steps(self, index: int, recovery_steps: List[str]) -> bool:
1227 | """Calls TaskManager to insert steps."""
1228 | return self.task_manager.insert_subtasks(index, recovery_steps)
1229 |
1230 | def _determine_action_and_selector_for_recording(self,
1231 | current_task: Dict[str, Any],
1232 | current_url: str,
1233 | dom_context_str: str # Now contains indexed elements with PRE-GENERATED selectors
1234 | ) -> Optional[Dict[str, Any]]: # Keep return type as Dict for downstream compatibility
1235 | """
1236 | Uses LLM (generate_json) to propose the browser action (click, type) and identify the target *element index*
1237 | based on the planned step description and the DOM context. The robust selector is retrieved
1238 | from the DOM state afterwards. Returns a dictionary representation or None on error.
1239 | """
1240 | logger.info(f"Determining AI suggestion for planned step: '{current_task['description']}'")
1241 |
1242 | prompt = f"""
1243 | You are an AI assistant helping a user record a web test. Your goal is to interpret the user's planned step and identify the **single target interactive element** in the provided context that corresponds to it, then suggest the appropriate action.
1244 |
1245 | **Feature Under Test:** {self.feature_description}
1246 | **Current Planned Step:** {current_task['description']}
1247 | **Current URL:** {current_url}
1248 | **Test Recording Progress:** Attempt {current_task['attempts']} of {self.task_manager.max_retries_per_subtask + 1} for this suggestion.
1249 |
1250 | **Input Context (Visible Interactive Elements with Indices):**
1251 | This section shows interactive elements on the page, each marked with `[index]` and its pre-generated robust CSS selector. You can also interact with the non visible elements
1252 | ```html
1253 | {dom_context_str}
1254 | ```
1255 |
1256 | **Your Task:**
1257 | Based ONLY on the "Current Planned Step" description and the "Input Context":
1258 | 1. Determine the appropriate **action** (`click`, `type`, `select`, `check`, `uncheck`, `key_press`, `drag_and_drop`, `action_not_applicable`, `suggestion_failed`).
1259 | 2. If action is `click`, `type`, `select`, `check`, `uncheck`, or `key_press`:
1260 | * Identify the **single most likely interactive element `[index]`** from the context that matches the description. Set `parameters.index`.
1261 | 3. If action is `type`: Extract the **text** to be typed. Set `parameters.text`.
1262 | 4. If action is `select`: Identify the main `<select>` element index and extract the target option's visible label into `parameters.option_label`.
1263 | 5. If action is `key_press`: Identify the target element `[index]` and extract the key(s) to press. Set `parameters.keys`.
1264 | 6. If action is `drag_and_drop`: Identify the source element `[index]` and the target element `[target_index]`.
1265 | 7. Provide brief **reasoning** linking the step description to the chosen index/action/parameters.
1266 |
1267 | **Output JSON Structure Examples:**
1268 |
1269 | *Click Action:*
1270 | ```json
1271 | {{
1272 | "action": "click",
1273 | "parameters": {{"index": 12}},
1274 | "reasoning": "The step asks to click the 'Login' button, which corresponds to element [12]."
1275 | }}
1276 | ```
1277 | *Type Action:*
1278 | ```json
1279 | {{
1280 | "action": "type",
1281 | "parameters": {{"index": 5, "text": "[email protected]"}},
1282 | "reasoning": "The step asks to type '[email protected]' into the email field, which is element [5]."
1283 | }}
1284 | ```
1285 | *Check Action:*
1286 | ```json
1287 | {{
1288 | "action": "check",
1289 | "parameters": {{"index": 8}},
1290 | "reasoning": "Step asks to check the 'Agree' checkbox [8]."
1291 | }}
1292 | ```
1293 | *Uncheck Action:*
1294 | ```json
1295 | {{
1296 | "action": "uncheck",
1297 | "parameters": {{"index": 9}},
1298 | "reasoning": "Step asks to uncheck 'Subscribe' [9]."
1299 | }}
1300 | *Key Press:*
1301 | ```json
1302 | {{
1303 | "action": "key_press",
1304 | "parameters": {{"index": 3, "keys": "Enter"}},
1305 | "reasoning": "The step asks to press Enter on the search input [3]."
1306 | }}
1307 | ```
1308 | *Drag and Drop:*
1309 | ```json
1310 | {{
1311 | "action": "drag_and_drop",
1312 | "parameters": {{"index": 10, "destination_index": 15}},
1313 | "reasoning": "The step asks to drag the item [10] to the cart area [15]."
1314 | }}
1315 | ```
1316 | ```json
1317 | {{
1318 | "action": "select",
1319 | "parameters": {{"index": 12, "option_label": "Weekly"}},
1320 | "reasoning": "The step asks to select 'Weekly' in the 'Notification Frequency' dropdown [12]."
1321 | }}
1322 | ```
1323 | ```
1324 | *Not Applicable (Navigation/Verification):*
1325 | ```json
1326 | {{
1327 | "action": "action_not_applicable",
1328 | "parameters": {{}},
1329 | "reasoning": "The step 'Navigate to ...' does not involve clicking or typing on an element from the context."
1330 | }}
1331 | ```
1332 | *Suggestion Failed (Cannot identify element):*
1333 | ```json
1334 | {{
1335 | "action": "suggestion_failed",
1336 | "parameters": {{}},
1337 | "reasoning": "Could not find a unique element matching 'the second confirmation button'."
1338 | }}
1339 | ```
1340 |
1341 | **CRITICAL INSTRUCTIONS:**
1342 | - Focus on the `[index]` and Do NOT output selectors for `click`/`type` actions.
1343 | - For `select` action, identify the main `<select>` element index and extract the target option's label into `parameters.option_label`.
1344 | - For `key_press`, provide the target `index` and the `keys` string.
1345 | - For `drag_and_drop`, provide the source `index` and the `target_index`
1346 | - Use `action_not_applicable` for navigation, verification, scroll, wait steps.
1347 | - Be precise with extracted `text` for the `type` action.
1348 |
1349 | Respond ONLY with the JSON object matching the schema.
1350 | """
1351 | # --- End Prompt ---
1352 |
1353 | # Add error context if retrying suggestion
1354 | if current_task['status'] == 'in_progress' and current_task['attempts'] > 1 and current_task.get('error'):
1355 | error_context = str(current_task['error'])[:300] + "..."
1356 | prompt += f"\n**Previous Suggestion Attempt Error:**\nAttempt {current_task['attempts'] - 1} failed: {error_context}\nRe-evaluate the description and context carefully.\n"
1357 |
1358 | # Add history summary for general context
1359 | prompt += f"\n**Recent History (Context):**\n{self._get_history_summary()}\n"
1360 |
1361 | logger.debug(f"[LLM RECORDER PROMPT] Sending prompt snippet for action/index suggestion:\n{prompt[:500]}...")
1362 |
1363 | response_obj = self.llm_client.generate_json(RecorderSuggestionSchema, prompt)
1364 |
1365 | suggestion_dict = None # Initialize
1366 | suggestion_failed = False
1367 | failure_reason = "LLM suggestion generation failed."
1368 |
1369 | if isinstance(response_obj, RecorderSuggestionSchema):
1370 | logger.debug(f"[LLM RECORDER RESPONSE] Parsed suggestion: {response_obj}")
1371 | # Convert to dict for downstream use (or refactor downstream to use object)
1372 | suggestion_dict = response_obj.model_dump(exclude_none=True)
1373 | action = suggestion_dict.get("action")
1374 | reasoning = suggestion_dict.get("reasoning", "No reasoning provided.")
1375 | logger.info(f"[LLM Suggestion] Action: {action}, Params: {suggestion_dict.get('parameters')}, Reasoning: {reasoning[:150]}...")
1376 | self._add_to_history("LLM Suggestion", suggestion_dict)
1377 |
1378 | # --- Basic Validation (Schema handles enum/types) ---
1379 | required_index_actions = ["click", "type", "check", "uncheck", "select", "key_press", "drag_and_drop"]
1380 | if action in required_index_actions:
1381 | target_index = suggestion_dict.get("parameters", {}).get("index")
1382 | if target_index is None: # Index is required for these actions
1383 | logger.error(f"LLM suggested action '{action}' but missing required index.")
1384 | suggestion_failed = True
1385 | failure_reason = f"LLM suggestion '{action}' missing required parameter 'index'."
1386 | elif action == "key_press" and suggestion_dict.get("parameters", {}).get("keys") is None:
1387 | logger.error("LLM suggested action 'key_press' but missing required keys.")
1388 | suggestion_failed = True
1389 | failure_reason = "LLM suggestion 'key_press' missing required parameter 'keys'."
1390 | elif action == "drag_and_drop" and suggestion_dict.get("parameters", {}).get("target_index") is None:
1391 | logger.error("LLM suggested action 'drag_and_drop' but missing required target_index.")
1392 | suggestion_failed = True
1393 | failure_reason = "LLM suggestion 'drag_and_drop' missing required parameter 'target_index'."
1394 | elif action == "type" and suggestion_dict.get("parameters", {}).get("text") is None:
1395 | logger.error(f"LLM suggested action 'type' but missing required text.")
1396 | suggestion_failed = True
1397 | failure_reason = f"LLM suggestion 'type' missing required parameter 'text'."
1398 |
1399 | elif action == "suggestion_failed":
1400 | suggestion_failed = True
1401 | failure_reason = suggestion_dict.get("reasoning", "LLM indicated suggestion failed.")
1402 |
1403 | elif action == "action_not_applicable":
1404 | pass # This is a valid outcome, handled below
1405 |
1406 | else: # Should not happen if schema enum is enforced
1407 | logger.error(f"LLM returned unexpected action type: {action}")
1408 | suggestion_failed = True
1409 | failure_reason = f"LLM returned unknown action '{action}'."
1410 |
1411 | elif isinstance(response_obj, str): # Handle error string
1412 | logger.error(f"[LLM Suggestion Failed] LLM returned an error string: {response_obj}")
1413 | self._add_to_history("LLM Suggestion Failed", {"raw_error_response": response_obj})
1414 | suggestion_failed = True
1415 | failure_reason = f"LLM error: {response_obj}"
1416 | else: # Handle unexpected type
1417 | logger.error(f"[LLM Suggestion Failed] Unexpected response type from generate_json: {type(response_obj)}")
1418 | self._add_to_history("LLM Suggestion Failed", {"response_type": str(type(response_obj))})
1419 | suggestion_failed = True
1420 | failure_reason = f"Unexpected response type: {type(response_obj)}"
1421 |
1422 | # --- Process Suggestion ---
1423 | if suggestion_failed:
1424 | # Return a standardized failure dictionary
1425 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": failure_reason}
1426 |
1427 | # Handle successful suggestions (click, type, not_applicable)
1428 | if suggestion_dict["action"] in required_index_actions:
1429 | target_index = suggestion_dict["parameters"]["index"] # We validated index exists above
1430 |
1431 | # --- Retrieve the node and pre-generated selector ---
1432 | if self._latest_dom_state is None or not self._latest_dom_state.selector_map:
1433 | logger.error("DOM state or selector map is missing, cannot lookup suggested index.")
1434 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Internal error: DOM state unavailable."}
1435 |
1436 | target_node = self._latest_dom_state.selector_map.get(target_index)
1437 | if target_node is None:
1438 | available_indices = list(self._latest_dom_state.selector_map.keys())
1439 | logger.error(f"LLM suggested index [{target_index}], but it was not found in DOM context map. Available: {available_indices}")
1440 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Suggested element index [{target_index}] not found in current page context."}
1441 |
1442 | suggested_selector = target_node.css_selector
1443 | if not suggested_selector:
1444 | # Try to generate it now if missing
1445 | suggested_selector = self.browser_controller.get_selector_for_node(target_node)
1446 | if suggested_selector:
1447 | target_node.css_selector = suggested_selector # Cache it
1448 | else:
1449 | logger.error(f"Could not generate selector for suggested index [{target_index}] (Node: {target_node.tag_name}).")
1450 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Failed to generate CSS selector for suggested index [{target_index}]."}
1451 |
1452 | if suggested_selector and target_node and self.browser_controller.page:
1453 | try:
1454 | handles = self.browser_controller.page.query_selector_all(suggested_selector)
1455 | num_matches = len(handles)
1456 | validation_passed = False
1457 |
1458 | if num_matches == 1:
1459 | # Get XPath of the element found by the generated selector
1460 | # Note: Requires a reliable JS function 'generateXPath' accessible in evaluate
1461 | try:
1462 | with resources.files(__package__).joinpath('js_utils', 'xpathgenerator.js') as js_path:
1463 | js_code = js_path.read_text(encoding='utf-8')
1464 | logger.debug("xpathgenerator.js loaded successfully.")
1465 | except FileNotFoundError:
1466 | logger.error("xpathgenerator.js not found in the 'agents' package directory!")
1467 | raise
1468 | except Exception as e:
1469 | logger.error(f"Error loading xpathgenerator.js: {e}", exc_info=True)
1470 | raise
1471 |
1472 | # Pass a *function string* to evaluate. Playwright passes the handle as 'element'.
1473 | # The function string first defines our helper, then calls it.
1474 | script_to_run = f"""
1475 | (element) => {{
1476 | {js_code} // Define the helper function(s)
1477 | return generateXPathForElement(element); // Call it
1478 | }}
1479 | """
1480 |
1481 | # Use page.evaluate, passing the script string and the element handle
1482 | matched_xpath = self.browser_controller.page.evaluate(script_to_run, handles[0])
1483 |
1484 | # Compare XPaths
1485 | if target_node.xpath == matched_xpath:
1486 | validation_passed = True
1487 | logger.info(f"Validation PASSED: Suggested selector '{suggested_selector}' uniquely matches target node [{target_index}].")
1488 | else:
1489 | logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched 1 element, but its XPath ('{matched_xpath}') differs from target node XPath ('{target_node.xpath}').")
1490 | elif num_matches == 0:
1491 | logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched 0 elements.")
1492 | else: # num_matches > 1
1493 | logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched {num_matches} elements (not unique).")
1494 |
1495 | # --- Fallback to XPath if validation failed ---
1496 | if not validation_passed:
1497 | logger.warning(f"Falling back to XPath selector for target node [{target_index}].")
1498 | original_selector = suggested_selector
1499 | suggested_selector = f"xpath={target_node.xpath}"
1500 | # Update the suggestion dictionary
1501 | suggestion_dict["suggested_selector"] = suggested_selector
1502 | suggestion_dict["reasoning"] = suggestion_dict.get("reasoning", "") + f" [Note: CSS selector ('{original_selector}') failed validation, using XPath fallback.]"
1503 |
1504 | except Exception as validation_err:
1505 | logger.error(f"Error during selector validation ('{suggested_selector}'): {validation_err}. Falling back to XPath.")
1506 | original_selector = suggested_selector
1507 | suggested_selector = f"xpath={target_node.xpath}"
1508 | # Update the suggestion dictionary
1509 | suggestion_dict["suggested_selector"] = suggested_selector
1510 | suggestion_dict["reasoning"] = suggestion_dict.get("reasoning", "") + f" [Note: Error validating CSS selector ('{original_selector}'), using XPath fallback.]"
1511 |
1512 | logger.info(f"LLM suggested index [{target_index}], resolved to selector: '{suggested_selector}'")
1513 | # Add resolved selector and node to the dictionary returned
1514 | suggestion_dict["suggested_selector"] = suggested_selector
1515 | suggestion_dict["target_node"] = target_node
1516 |
1517 | if suggestion_dict["action"] == "drag_and_drop":
1518 | destination_index = suggestion_dict["parameters"].get("destination_index")
1519 |
1520 | if destination_index is not None:
1521 | destination_node = self._latest_dom_state.selector_map.get(destination_index)
1522 |
1523 | if destination_node is None:
1524 | available_indices = list(self._latest_dom_state.selector_map.keys())
1525 | logger.error(f"LLM suggested index [{destination_index}], but it was not found in DOM context map. Available: {available_indices}")
1526 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Suggested element index [{destination_index}] not found in current page context."}
1527 | destination_selector = destination_node.css_selector
1528 | if not destination_selector:
1529 | # Try to generate it now if missing
1530 | destination_selector = self.browser_controller.get_selector_for_node(destination_node)
1531 | if destination_selector:
1532 | destination_node.css_selector = destination_selector # Cache it
1533 | else:
1534 | logger.error(f"Could not generate selector for suggested index [{destination_index}] (Node: {destination_node.tag_name}).")
1535 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Failed to generate CSS selector for suggested index [{destination_index}]."}
1536 |
1537 | suggestion_dict["destination_selector"] = destination_selector
1538 | suggestion_dict["destination_node"] = destination_node
1539 | logger.info(f"LLM suggested drag target index [{destination_index}], resolved to selector: '{destination_selector}'")
1540 | else: # Should have been caught by validation, but double-check
1541 | logger.error("LLM suggested drag_and_drop without destination_index.")
1542 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Drag and drop suggestion missing destination index."}
1543 |
1544 | return suggestion_dict
1545 |
1546 | elif suggestion_dict["action"] == "action_not_applicable":
1547 | # Pass this through directly
1548 | return suggestion_dict
1549 |
1550 | else: # Should be unreachable given the checks above
1551 | logger.error("Reached unexpected point in suggestion processing.")
1552 | return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Internal processing error after LLM response."}
1553 |
1554 |
1555 | def _execute_action_for_recording(self, action: str, selector: Optional[str], parameters: Dict[str, Any]) -> Dict[str, Any]:
1556 | """
1557 | Executes a specific browser action (navigate, click, type) during recording.
1558 | This is called *after* user confirmation/override. It does not involve AI decision.
1559 | """
1560 | result = {"success": False, "message": f"Action '{action}' invalid.", "data": None}
1561 |
1562 | if not action:
1563 | result["message"] = "No action specified for execution."
1564 | logger.warning(f"[RECORDER_EXEC] {result['message']}")
1565 | return result
1566 |
1567 | logger.info(f"[RECORDER_EXEC] Executing: {action} | Selector: {selector} | Params: {parameters}")
1568 | self._add_to_history("Executing Recorder Action", {"action": action, "selector": selector, "parameters": parameters})
1569 |
1570 | try:
1571 | if action == "navigate":
1572 | url = parameters.get("url")
1573 | if not url or not isinstance(url, str): raise ValueError("Missing or invalid 'url'.")
1574 | self.browser_controller.goto(url)
1575 | result["success"] = True
1576 | result["message"] = f"Navigated to {url}."
1577 | # Add implicit wait for load state after navigation
1578 | self.recorded_steps.append({
1579 | "step_id": self._current_step_id, # Use internal counter
1580 | "action": "wait_for_load_state",
1581 | "description": "Wait for page navigation to complete",
1582 | "parameters": {"state": "domcontentloaded"}, # Reasonable default
1583 | "selector": None,
1584 | "wait_after_secs": 0
1585 | })
1586 | self._current_step_id += 1 # Increment after adding implicit step
1587 |
1588 | elif action == "click":
1589 | if not selector: raise ValueError("Missing selector for click action.")
1590 | self.browser_controller.click(selector)
1591 | time.sleep(0.5)
1592 | result["success"] = True
1593 | result["message"] = f"Clicked element: {selector}."
1594 |
1595 | elif action == "type":
1596 | text = parameters.get("text")
1597 | if not selector: raise ValueError("Missing selector for type action.")
1598 | if text is None: raise ValueError("Missing or invalid 'text'.") # Allow empty string? yes.
1599 | self.browser_controller.type(selector, text)
1600 | result["success"] = True
1601 | result["message"] = f"Typed into element: {selector}."
1602 |
1603 | elif action == "scroll": # Basic scroll support if planned
1604 | direction = parameters.get("direction")
1605 | if direction not in ["up", "down"]: raise ValueError("Invalid scroll direction.")
1606 | self.browser_controller.scroll(direction)
1607 | result["success"] = True
1608 | result["message"] = f"Scrolled {direction}."
1609 |
1610 | elif action == "check":
1611 | if not selector: raise ValueError("Missing selector for check action.")
1612 | self.browser_controller.check(selector)
1613 | result["success"] = True
1614 | result["message"] = f"Checked element: {selector}."
1615 |
1616 | elif action == "uncheck":
1617 | if not selector: raise ValueError("Missing selector for uncheck action.")
1618 | self.browser_controller.uncheck(selector)
1619 | result["success"] = True
1620 | result["message"] = f"Unchecked element: {selector}."
1621 | elif action == "select":
1622 | option_label = parameters.get("option_label")
1623 | # option_value = parameters.get("option_value") # If supporting value selection
1624 | if not selector: raise ValueError("Missing selector for select action.")
1625 | if not option_label: # and not option_value: # Prioritize label
1626 | raise ValueError("Missing 'option_label' parameter for select action.")
1627 |
1628 | logger.info(f"Selecting option by label '{option_label}' in element: {selector}")
1629 | # Use the main browser_controller page reference
1630 | locator = self.browser_controller._get_locator(selector) # Use helper to get locator
1631 | # select_option can take label, value, or index
1632 | locator.select_option(label=option_label, timeout=self.browser_controller.default_action_timeout)
1633 | result["success"] = True
1634 | result["message"] = f"Selected option '{option_label}' in element: {selector}."
1635 | elif action == "key_press":
1636 | keys = parameters.get("keys")
1637 | if not selector: raise ValueError("Missing selector for key_press action.")
1638 | if not keys: raise ValueError("Missing 'keys' parameter for key_press action.")
1639 | self.browser_controller.press(selector, keys)
1640 | result["success"] = True
1641 | result["message"] = f"Pressed '{keys}' on element: {selector}."
1642 |
1643 | elif action == "drag_and_drop":
1644 | destination_selector = parameters.get("destination_selector")
1645 | if not selector: raise ValueError("Missing source selector for drag_and_drop.")
1646 | if not destination_selector: raise ValueError("Missing 'destination_selector' parameter for drag_and_drop.")
1647 | self.browser_controller.drag_and_drop(selector, destination_selector)
1648 | result["success"] = True
1649 | result["message"] = f"Dragged '{selector}' to '{destination_selector}'."
1650 |
1651 | # NOTE: "wait" actions are generally not *executed* during recording,
1652 | # they are just recorded based on the plan.
1653 |
1654 |
1655 | else:
1656 | result["message"] = f"Action '{action}' is not directly executable during recording via this method."
1657 | logger.warning(f"[RECORDER_EXEC] {result['message']}")
1658 |
1659 |
1660 | except (PlaywrightError, PlaywrightTimeoutError, ValueError) as e:
1661 | error_msg = f"Execution during recording failed for action '{action}' on selector '{selector}': {type(e).__name__}: {e}"
1662 | logger.error(f"[RECORDER_EXEC] {error_msg}", exc_info=False)
1663 | result["message"] = error_msg
1664 | result["success"] = False
1665 | # Optionally save screenshot on execution failure *during recording*
1666 | try:
1667 | ts = time.strftime("%Y%m%d_%H%M%S")
1668 | fname = f"output/recorder_exec_fail_{action}_{ts}.png"
1669 | self.browser_controller.save_screenshot(fname)
1670 | logger.info(f"Saved screenshot on recorder execution failure: {fname}")
1671 | except: pass # Ignore screenshot errors here
1672 |
1673 | except Exception as e:
1674 | error_msg = f"Unexpected Error during recorder execution action '{action}': {type(e).__name__}: {e}"
1675 | logger.critical(f"[RECORDER_EXEC] {error_msg}", exc_info=True)
1676 | result["message"] = error_msg
1677 | result["success"] = False
1678 |
1679 | # Log Action Result
1680 | log_level = logging.INFO if result["success"] else logging.WARNING
1681 | logger.log(log_level, f"[RECORDER_EXEC_RESULT] Action '{action}' | Success: {result['success']} | Message: {result['message']}")
1682 | self._add_to_history("Recorder Action Result", {"success": result["success"], "message": result["message"]})
1683 |
1684 | return result
1685 |
1686 | # --- New Recorder Core Logic ---
1687 |
1688 | def _handle_interactive_step_recording(self, planned_step: Dict[str, Any], suggestion: Dict[str, Any]) -> bool:
1689 | """
1690 | Handles the user interaction loop for a suggested 'click' or 'type' action.
1691 | Returns True if the step was successfully recorded (or skipped), False if aborted.
1692 | """
1693 | action = suggestion["action"]
1694 | suggested_selector = suggestion["suggested_selector"] # source selector
1695 | target_node = suggestion["target_node"] # DOMElementNode # source node
1696 | destination_selector = suggestion.get("destination_selector")
1697 | destination_node = suggestion.get("destination_node")
1698 | parameters = suggestion["parameters"] # Contains index and potentially text
1699 | reasoning = suggestion.get("reasoning", "N/A")
1700 | planned_desc = planned_step["description"]
1701 |
1702 | final_selector = None
1703 | performed_action = False
1704 | user_choice = None
1705 | parameter_name = None
1706 | action_recorded = False # Flag to track if we actually recorded the step
1707 |
1708 | # --- Automated Mode ---
1709 | if self.automated_mode:
1710 | logger.info(f"[Auto Mode] Handling AI suggestion: Action='{action}', Target='{target_node.tag_name}' (Reason: {reasoning})")
1711 | logger.info(f"[Auto Mode] Suggested Selector: {suggested_selector}")
1712 | if action == "drag_and_drop":
1713 | logger.info(f"[Auto Mode] Suggested Target Selector: {destination_selector}")
1714 | self.browser_controller.clear_highlights()
1715 | self.browser_controller.highlight_element(suggested_selector, target_node.highlight_index, color="#FFA500", text="AI Suggestion")
1716 |
1717 | if action == "drag_and_drop" and destination_selector and destination_node:
1718 | self.browser_controller.highlight_element(destination_selector, destination_node.highlight_index, color="#0000FF", text="AI Suggestion (Target)")
1719 |
1720 | # Directly accept AI suggestion
1721 | final_selector = suggested_selector
1722 | final_destination_selector = destination_selector
1723 | logger.info(f"[Auto Mode] Automatically accepting AI suggestion.")
1724 |
1725 | exec_params = parameters.copy() # Start with base params
1726 | if action == "drag_and_drop":
1727 | exec_params["destination_selector"] = final_destination_selector # Add target for execution
1728 |
1729 |
1730 | # Execute action on AI's suggested selector
1731 | exec_result = self._execute_action_for_recording(action, final_selector, exec_params)
1732 | performed_action = exec_result["success"]
1733 |
1734 | if performed_action:
1735 | # Record the successful step automatically
1736 | record = {
1737 | "step_id": self._current_step_id, "action": action, "description": planned_desc,
1738 | "parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION
1739 | }
1740 | if action == "type":
1741 | # Include text, but no parameterization prompt
1742 | record["parameters"]["text"] = parameters.get("text", "")
1743 | elif action == "select":
1744 | # Ensure 'option_label' from the suggestion's parameters is added
1745 | if "option_label" in parameters:
1746 | record["parameters"]["option_label"] = parameters["option_label"]
1747 | if "option_value" in parameters: record["parameters"]["option_value"] = parameters["option_value"]
1748 | if "option_index" in parameters: record["parameters"]["option_index"] = parameters["option_index"]
1749 | elif action == "key_press":
1750 | record["parameters"]["keys"] = parameters.get("keys", "")
1751 | elif action == "drag_and_drop":
1752 | record["parameters"]["destination_selector"] = final_destination_selector
1753 |
1754 |
1755 | self.recorded_steps.append(record)
1756 | self._current_step_id += 1
1757 | logger.info(f"Step {record['step_id']} recorded (AI Suggestion - Automated): {action} on {final_selector}")
1758 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded AI suggestion (automated) as step {record['step_id']}")
1759 | return True # Success
1760 |
1761 | else: # AI suggestion execution failed
1762 | logger.error(f"[Auto Mode] Execution FAILED using AI suggested selector: {exec_result['message']}")
1763 | # Mark as failed for potential re-planning or retry in the main loop
1764 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "failed", error=f"Automated execution failed: {exec_result['message']}")
1765 | self.browser_controller.clear_highlights()
1766 | self.panel.hide_recorder_panel()
1767 | # Do not abort automatically, let the main loop handle failure/retry logic
1768 | return True # Indicate handled (failure noted), loop continues
1769 | # --- Interactive Mode ---
1770 | else:
1771 |
1772 |
1773 | print("\n" + "="*60)
1774 | print(f"Planned Step: {planned_desc}")
1775 | print(f"AI Suggestion: Action='{action}', Target='{target_node.tag_name}' (Reason: {reasoning})")
1776 | print(f"Suggested Selector: {suggested_selector}")
1777 | if action == "drag_and_drop":
1778 | print(f"Suggested Selector (Destination): {destination_selector}")
1779 | print("="*60)
1780 |
1781 | # Highlight suggested element
1782 | self.browser_controller.clear_highlights()
1783 | self.browser_controller.highlight_element(suggested_selector, target_node.highlight_index, color="#FFA500", text="AI Suggestion") # Orange for suggestion
1784 | if action == "drag_and_drop" and destination_selector and destination_node:
1785 | self.browser_controller.highlight_element(destination_selector, destination_node.highlight_index, color="#0000FF", text="AI Suggestion (Destination)")
1786 |
1787 | # Show the UI Panel with options
1788 | suggestion_display_text = f"'{action}' on <{target_node.tag_name}>"
1789 | if action == "type": suggestion_display_text += f" with text '{parameters.get('text', '')[:20]}...'"
1790 | elif action == "key_press": suggestion_display_text += f" with key(s) '{parameters.get('keys', '')}'"
1791 | elif action == "drag_and_drop": suggestion_display_text += f" to <{destination_node.tag_name if destination_node else 'N/A'}>"
1792 | self.panel.show_recorder_panel(planned_desc, suggestion_display_text)
1793 |
1794 | # Setup listener *after* showing panel, *before* waiting
1795 | listener_setup = self.browser_controller.setup_click_listener()
1796 | if not listener_setup:
1797 | logger.error("Failed to set up click listener, cannot proceed with override.")
1798 | self.panel.hide_recorder_panel()
1799 | return False # Abort
1800 |
1801 | # --- Wait for User Interaction (Click Override OR Panel Button) ---
1802 | # Total time budget for interaction
1803 | TOTAL_INTERACTION_TIMEOUT = 20.0 # e.g., 20 seconds total
1804 | PANEL_WAIT_TIMEOUT = 15.0 # Time to wait for panel *after* click timeout
1805 |
1806 | override_selector = None
1807 |
1808 | try:
1809 | logger.debug("Waiting for user click override...")
1810 | # Wait for click first (short timeout)
1811 | click_wait_time = TOTAL_INTERACTION_TIMEOUT - PANEL_WAIT_TIMEOUT
1812 | override_selector = self.browser_controller.wait_for_user_click_or_timeout(click_wait_time)
1813 |
1814 | if override_selector:
1815 | print(f"\n[Recorder] User override detected! Using selector: {override_selector}")
1816 | user_choice = 'override' # Special internal choice
1817 | else:
1818 | # Click timed out, now wait for panel interaction
1819 | logger.debug("No click override. Waiting for panel interaction...")
1820 | user_choice = self.panel.wait_for_panel_interaction(PANEL_WAIT_TIMEOUT)
1821 | if user_choice:
1822 | print(f"\n[Recorder] User choice via panel: {user_choice}")
1823 | else:
1824 | print("\n[Recorder] Timeout waiting for panel interaction. Skipping step.")
1825 | user_choice = 'skip' # Default to skip on timeout
1826 |
1827 | except Exception as e:
1828 | logger.error(f"Error during user interaction wait: {e}", exc_info=True)
1829 | user_choice = 'abort' # Abort on unexpected error
1830 |
1831 |
1832 | # --- Process User Choice ---
1833 | if user_choice == 'override':
1834 | final_selector = override_selector
1835 | final_destination_selector = destination_selector if action == "drag_and_drop" else None
1836 | performed_action = False
1837 | print(f"Executing original action '{action}' on overridden selector...")
1838 | exec_params = parameters.copy()
1839 | if action == "drag_and_drop":
1840 | exec_params["destination_selector"] = final_destination_selector
1841 | exec_result = self._execute_action_for_recording(action, final_selector, parameters)
1842 | performed_action = exec_result["success"]
1843 |
1844 | if performed_action:
1845 | # --- Ask for Parameterization (for 'type' action) ---
1846 | if action == "type":
1847 | param_text = parameters.get("text", "")
1848 | if self.panel.prompt_parameterization_in_panel(param_text):
1849 | print(f"Parameterize '{param_text[:30]}...'? Enter name in panel or leave blank...")
1850 | param_choice = self.panel.wait_for_panel_interaction(15.0) # Wait for param submit
1851 | if param_choice == 'parameterized':
1852 | parameter_name = self.panel.get_parameterization_result()
1853 | print(f"Parameter name set to: '{parameter_name}'" if parameter_name else "No parameter name entered.")
1854 | else:
1855 | print("Parameterization skipped or timed out.")
1856 | else:
1857 | print("Could not show parameterization UI.")
1858 |
1859 | # --- Record the override step ---
1860 | record = { "step_id": self._current_step_id, "action": action, "description": planned_desc,
1861 | "parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION }
1862 | if action == "type": record["parameters"]["text"] = parameters.get("text", "")
1863 | elif action == "select":
1864 | # Assume original parameters (like option_label) still apply for override
1865 | if "option_label" in parameters:
1866 | record["parameters"]["option_label"] = parameters["option_label"]
1867 | elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "")
1868 | elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector
1869 |
1870 | if parameter_name: record["parameters"]["parameter_name"] = parameter_name
1871 | self.recorded_steps.append(record)
1872 | self._current_step_id += 1
1873 | logger.info(f"Step {record['step_id']} recorded (User Override): {action} on {final_selector}")
1874 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded override as step {record['step_id']}")
1875 | action_recorded = True
1876 | else:
1877 | # Override execution failed
1878 | print(f"WARNING: Execution failed using override selector: {exec_result['message']}")
1879 | # Ask to skip or abort via panel again? Simpler to just skip here.
1880 | print("Skipping step after failed override execution.")
1881 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Skipped after failed override execution")
1882 |
1883 |
1884 | elif user_choice == 'accept':
1885 | print("Accepting AI suggestion.")
1886 | final_selector = suggested_selector
1887 | final_destination_selector = destination_selector if action == "drag_and_drop" else None
1888 | performed_action = False
1889 | exec_params = parameters.copy()
1890 | if action == "drag_and_drop":
1891 | exec_params["destination_selector"] = final_destination_selector
1892 | exec_result = self._execute_action_for_recording(action, final_selector, parameters)
1893 | performed_action = exec_result["success"]
1894 |
1895 | if performed_action:
1896 | # --- Ask for Parameterization ---
1897 | if action == "type":
1898 | param_text = parameters.get("text", "")
1899 | if self.panel.prompt_parameterization_in_panel(param_text):
1900 | print(f"Parameterize '{param_text[:30]}...'? Enter name in panel or leave blank...")
1901 | param_choice = self.panel.wait_for_panel_interaction(15.0)
1902 | if param_choice == 'parameterized':
1903 | parameter_name = self.panel.get_parameterization_result()
1904 | print(f"Parameter name set to: '{parameter_name}'" if parameter_name else "No parameter name entered.")
1905 | else:
1906 | print("Parameterization skipped or timed out.")
1907 | else:
1908 | print("Could not show parameterization UI.")
1909 |
1910 | # --- Record the accepted AI suggestion ---
1911 | record = { "step_id": self._current_step_id, "action": action, "description": planned_desc,
1912 | "parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION }
1913 | if action == "type": record["parameters"]["text"] = parameters.get("text", "")
1914 | elif action == "select":
1915 | if "option_label" in parameters:
1916 | record["parameters"]["option_label"] = parameters["option_label"]
1917 | elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "")
1918 | elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector
1919 |
1920 | if parameter_name: record["parameters"]["parameter_name"] = parameter_name
1921 | self.recorded_steps.append(record)
1922 | self._current_step_id += 1
1923 | logger.info(f"Step {record['step_id']} recorded (AI Suggestion): {action} on {final_selector}")
1924 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded AI suggestion as step {record['step_id']}")
1925 | action_recorded = True
1926 | else:
1927 | # AI suggestion execution failed
1928 | print(f"WARNING: Execution failed using AI suggested selector: {exec_result['message']}")
1929 | # Ask to retry/skip/abort via panel again? Or mark as failed for main loop retry?
1930 | # Let's mark as failed for retry by the main loop.
1931 | print("Marking step for retry after execution failure...")
1932 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "failed", error=f"Execution failed: {exec_result['message']}")
1933 | # Action was NOT recorded in this case
1934 |
1935 | elif user_choice == 'skip':
1936 | print("Skipping planned step.")
1937 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped via panel")
1938 |
1939 | elif user_choice == 'abort':
1940 | print("Aborting recording process.")
1941 | self._user_abort_recording = True
1942 | # No need to update task manager status if aborting globally
1943 |
1944 | else: # Should not happen with panel, but handle defensively (e.g., timeout resulted in None)
1945 | print("Invalid choice or timeout. Skipping step.")
1946 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Invalid user choice or timeout")
1947 |
1948 | # --- Cleanup UI after interaction ---
1949 | self.browser_controller.clear_highlights()
1950 | self.panel.hide_recorder_panel()
1951 | if listener_setup:
1952 | self.browser_controller.remove_click_listener() # Ensure listener removed
1953 |
1954 | return not self._user_abort_recording # Return True if handled (recorded/skipped/failed for retry), False only on ABORT
1955 |
1956 | def _get_llm_assertion_target_index(self, planned_desc: str, dom_context_str: str) -> Tuple[Optional[int], Optional[str]]:
1957 | """Helper function to ask LLM for the target index for an assertion."""
1958 |
1959 | def _handle_assertion_recording(self, planned_step: Dict[str, Any]) -> bool:
1960 | """
1961 | Handles prompting the user for assertion details based on a 'Verify...' planned step.
1962 | Returns True if recorded/skipped, False if aborted.
1963 | """
1964 | if self.automated_mode:
1965 | logger.error("[Auto Mode] Reached manual assertion handler. This indicates verification fallback failed or wasn't triggered. Skipping step.")
1966 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Skipped (Manual assertion handler reached in auto mode)")
1967 | return True # Skip
1968 |
1969 | planned_desc = planned_step["description"]
1970 | logger.info(f"Starting interactive assertion definition for: '{planned_desc}'")
1971 | print("\n" + "="*60)
1972 | print(f"Planned Step: {planned_desc}")
1973 | print("Define Assertion via UI Panel...")
1974 | print("="*60)
1975 |
1976 | current_state = "target_selection" # States: target_selection, type_selection, param_input
1977 | suggested_selector = None
1978 | final_selector = None
1979 | assertion_action = None
1980 | assertion_params = {}
1981 | llm_target_suggestion_failed = False # Track if initial suggestion failed
1982 |
1983 | try:
1984 | # 1. Identify Target Element (Use LLM again, simplified prompt)
1985 | # We need a selector for the element to assert against.
1986 | current_url = self.browser_controller.get_current_url()
1987 | dom_context_str = "Error getting DOM"
1988 | if self._latest_dom_state:
1989 | dom_context_str, _ = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification')
1990 | prompt = f"""
1991 | Given the verification step: "{planned_desc}"
1992 | And the current interactive elements context (with indices). You can interact with non visible elements in the tree too:
1993 | ```html
1994 | {dom_context_str}
1995 | ```
1996 | Identify the element index `[index]` most relevant to this verification task.
1997 | Respond ONLY with a JSON object matching the schema:
1998 | {{
1999 | "index": INDEX_NUMBER_OR_NULL,
2000 | "reasoning": "OPTIONAL_REASONING_IF_NULL"
2001 | }}
2002 |
2003 | Example Output (Found): {{"index": 5}}
2004 | Example Output (Not Found): {{"index": null, "reasoning": "Cannot determine a single target element for 'verify presence of error'."}}
2005 | """
2006 | logger.debug(f"[LLM ASSERT PROMPT] Sending prompt for assertion target index:\n{prompt[:500]}...")
2007 |
2008 | response_obj = self.llm_client.generate_json(AssertionTargetIndexSchema, prompt)
2009 |
2010 | target_index = None
2011 | llm_reasoning = "LLM did not provide a target index or reasoning." # Default
2012 |
2013 | if isinstance(response_obj, AssertionTargetIndexSchema):
2014 | logger.debug(f"[LLM ASSERT RESPONSE] Parsed index response: {response_obj}")
2015 | target_index = response_obj.index # Will be None if null in JSON
2016 | if target_index is None and response_obj.reasoning:
2017 | llm_reasoning = response_obj.reasoning
2018 | elif target_index is None:
2019 | llm_reasoning = "LLM did not identify a target element (index is null)."
2020 |
2021 | elif isinstance(response_obj, str): # Handle error string
2022 | logger.error(f"[LLM ASSERT RESPONSE] Failed to get target index JSON: {response_obj}")
2023 | llm_reasoning = f"LLM error getting target index: {response_obj}"
2024 | else: # Handle unexpected type
2025 | logger.error(f"[LLM ASSERT RESPONSE] Unexpected response type for target index: {type(response_obj)}")
2026 | llm_reasoning = f"Unexpected LLM response type: {type(response_obj)}"
2027 |
2028 | target_node = None
2029 | target_selector = None
2030 |
2031 | if target_index is not None:
2032 | if self._latest_dom_state and self._latest_dom_state.selector_map:
2033 | target_node = self._latest_dom_state.selector_map.get(target_index)
2034 | if target_node and target_node.css_selector:
2035 | suggested_selector = target_node.css_selector
2036 | print(f"AI suggests target [Index: {target_index}]: <{target_node.tag_name}>")
2037 | print(f" Selector: `{suggested_selector}`")
2038 | self.browser_controller.clear_highlights()
2039 | self.browser_controller.highlight_element(suggested_selector, target_index, color="#0000FF", text="Assert Target?")
2040 | else:
2041 | print(f"AI suggested index [{target_index}], but element/selector not found.")
2042 | llm_target_suggestion_failed = True
2043 | else:
2044 | print(f"AI suggested index [{target_index}], but DOM map unavailable.")
2045 | llm_target_suggestion_failed = True
2046 | else:
2047 | print(f"AI could not suggest a target element. Reason: {llm_reasoning}")
2048 | llm_target_suggestion_failed = True # Mark as failed if no index
2049 | except Exception as e:
2050 | logger.error(f"Error getting initial assertion target suggestion: {e}", exc_info=True)
2051 | print(f"Error getting AI suggestion: {e}")
2052 | llm_target_suggestion_failed = True
2053 |
2054 |
2055 |
2056 |
2057 | # --- User confirms/overrides target selector ---
2058 | while True:
2059 | user_choice = None
2060 | override_selector = None
2061 |
2062 | # --- State 1: Target Selection ---
2063 | if current_state == "target_selection":
2064 | print("Panel State: Confirm or Override Target Selector.")
2065 | self.panel.show_assertion_target_panel(planned_desc, suggested_selector)
2066 | listener_setup = self.browser_controller.setup_click_listener()
2067 | if not listener_setup:
2068 | logger.error("Failed to set up click listener for override.")
2069 | user_choice = 'abort' # Force abort if listener fails
2070 | else:
2071 | # Wait for click override OR panel interaction
2072 | try:
2073 | logger.debug("Waiting for user click override (Assertion Target)...")
2074 | override_selector = self.browser_controller.wait_for_user_click_or_timeout(5.0) # 5s for click override
2075 | if override_selector:
2076 | print(f"\n[Recorder] User override target detected! Using selector: {override_selector}")
2077 | user_choice = 'override_target_confirmed' # Internal choice after click
2078 | else:
2079 | logger.debug("No click override. Waiting for panel interaction (Assertion Target)...")
2080 | user_choice = self.panel.wait_for_panel_interaction(15.0) # Wait longer for panel
2081 | if not user_choice: user_choice = 'skip' # Default to skip on panel timeout
2082 | except Exception as e:
2083 | logger.error(f"Error during assertion target interaction wait: {e}", exc_info=True)
2084 | user_choice = 'abort'
2085 |
2086 | # --- Process Target Choice ---
2087 | self.browser_controller.remove_click_listener() # Remove listener after this stage
2088 |
2089 | if user_choice == 'confirm_target':
2090 | if suggested_selector:
2091 | final_selector = suggested_selector
2092 | print(f"Using suggested target: {final_selector}")
2093 | current_state = "type_selection" # Move to next state
2094 | continue # Restart loop in new state
2095 | else:
2096 | print("Error: Cannot confirm target, no suggestion was available.")
2097 | # Stay in this state or treat as skip? Let's allow retry.
2098 | current_state = "target_selection"
2099 | continue
2100 |
2101 | elif user_choice == 'override_target_confirmed': # Came from click override
2102 | if override_selector:
2103 | final_selector = override_selector
2104 | print(f"Using override target: {final_selector}")
2105 | # Try highlighting the user's choice
2106 | try:
2107 | self.browser_controller.clear_highlights()
2108 | self.browser_controller.highlight_element(final_selector, 0, color="#00FF00", text="User Target")
2109 | except Exception as e:
2110 | print(f"Warning: Could not highlight user selector '{final_selector}': {e}")
2111 | current_state = "type_selection" # Move to next state
2112 | continue # Restart loop in new state
2113 | else:
2114 | print("Error: Override click detected but no selector captured.")
2115 | current_state = "target_selection" # Stay here
2116 | continue
2117 |
2118 | elif user_choice == 'override_target': # Clicked button in panel to enable clicking
2119 | print("Click the element on the page you want to assert against...")
2120 | self.panel.hide_recorder_panel() # Hide panel while clicking
2121 | listener_setup = self.browser_controller.setup_click_listener()
2122 | if not listener_setup:
2123 | logger.error("Failed to set up click listener for override.")
2124 | user_choice = 'abort'
2125 | else:
2126 | try:
2127 | override_selector = self.browser_controller.wait_for_user_click_or_timeout(20.0) # Longer wait for manual click
2128 | if override_selector:
2129 | print(f"\n[Recorder] User override target selected: {override_selector}")
2130 | user_choice = 'override_target_confirmed' # Set internal choice
2131 | else:
2132 | print("Timeout waiting for override click. Please try again.")
2133 | user_choice = None # Force loop restart in target_selection
2134 | except Exception as e:
2135 | logger.error(f"Error during manual override click wait: {e}", exc_info=True)
2136 | user_choice = 'abort'
2137 | self.browser_controller.remove_click_listener() # Remove listener
2138 |
2139 | if user_choice == 'override_target_confirmed':
2140 | final_selector = override_selector
2141 | try:
2142 | self.browser_controller.clear_highlights()
2143 | self.browser_controller.highlight_element(final_selector, 0, color="#00FF00", text="User Target")
2144 | except Exception as e: print(f"Warning: Could not highlight user selector: {e}")
2145 | current_state = "type_selection"
2146 | continue
2147 | elif user_choice == 'abort':
2148 | self._user_abort_recording = True; break # Exit loop
2149 | else: # Timeout or error during manual click
2150 | current_state = "target_selection" # Go back to target panel
2151 | continue
2152 |
2153 | elif user_choice == 'skip':
2154 | print("Skipping assertion definition.")
2155 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion")
2156 | break # Exit loop, return True
2157 | elif user_choice == 'abort':
2158 | self._user_abort_recording = True; break # Exit loop, return False
2159 | else: # Includes timeout, None, unexpected values
2160 | print("Invalid choice or timeout. Please select target action.")
2161 | current_state = "target_selection" # Stay here
2162 | continue
2163 |
2164 | # --- State 2: Type Selection ---
2165 | elif current_state == "type_selection":
2166 | if not final_selector: # Should not happen if logic is correct
2167 | logger.error("Assertion state error: Reached type selection without a final selector.")
2168 | current_state = "target_selection"; continue # Go back
2169 |
2170 | print("Panel State: Select Assertion Type.")
2171 | self.panel.show_assertion_type_panel(final_selector)
2172 | user_choice = self.panel.wait_for_panel_interaction(20.0) # Wait for type selection
2173 | if not user_choice: user_choice = 'skip' # Default to skip on timeout
2174 |
2175 | # --- Process Type Choice ---
2176 | if user_choice.startswith('select_type_'):
2177 | type_suffix = user_choice.split('select_type_')[-1]
2178 | # Map suffix to actual action string
2179 | action_map = {
2180 | 'text_contains': "assert_text_contains", 'text_equals': "assert_text_equals",
2181 | 'visible': "assert_visible", 'hidden': "assert_hidden",
2182 | 'attribute_equals': "assert_attribute_equals", 'element_count': "assert_element_count",
2183 | 'checked': "assert_checked", 'not_checked': "assert_not_checked", "disabled": "assert_disabled", "enabled": "assert_enabled", "vision_llm": "assert_llm_verification"
2184 | }
2185 | assertion_action = action_map.get(type_suffix)
2186 | if not assertion_action:
2187 | print(f"Error: Unknown assertion type selected '{type_suffix}'.")
2188 | current_state = "type_selection"; continue # Ask again
2189 |
2190 | print(f"Assertion type selected: {assertion_action}")
2191 | # Check if parameters are needed
2192 | needs_params_map = {
2193 | "assert_text_contains": ["Expected Text"], "assert_text_equals": ["Expected Text"],
2194 | "assert_attribute_equals": ["Attribute Name", "Expected Value"],
2195 | "assert_element_count": ["Expected Count"]
2196 | }
2197 | if assertion_action in needs_params_map:
2198 | current_state = "param_input" # Move to param state
2199 | continue # Restart loop in new state
2200 | else:
2201 | assertion_params = {} # No params needed
2202 | # Proceed directly to recording
2203 | break # Exit loop to record
2204 |
2205 | elif user_choice == 'back_to_target':
2206 | current_state = "target_selection"; continue # Go back
2207 | elif user_choice == 'skip':
2208 | print("Skipping assertion definition.")
2209 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion")
2210 | break # Exit loop, return True
2211 | elif user_choice == 'abort':
2212 | self._user_abort_recording = True; break # Exit loop, return False
2213 | else: # Includes timeout, None, unexpected values
2214 | print("Invalid choice or timeout. Please select assertion type.")
2215 | current_state = "type_selection"; continue # Stay here
2216 |
2217 | # --- State 3: Parameter Input ---
2218 | elif current_state == "param_input":
2219 | if not final_selector or not assertion_action: # Should not happen
2220 | logger.error("Assertion state error: Reached param input without selector or action.")
2221 | current_state = "target_selection"; continue # Go way back
2222 |
2223 | print("Panel State: Enter Assertion Parameters.")
2224 | needs_params_map = { # Redefine here for clarity
2225 | "assert_text_contains": ["Expected Text"], "assert_text_equals": ["Expected Text"],
2226 | "assert_attribute_equals": ["Attribute Name", "Expected Value"],
2227 | "assert_element_count": ["Expected Count"]
2228 | }
2229 | param_labels = needs_params_map.get(assertion_action, [])
2230 | self.panel.show_assertion_params_panel(final_selector, assertion_action, param_labels)
2231 | user_choice = self.panel.wait_for_panel_interaction(60.0) # Longer timeout for typing
2232 | if not user_choice: user_choice = 'skip' # Default to skip on timeout
2233 |
2234 | # --- Process Param Choice ---
2235 | if user_choice == 'submit_params':
2236 | raw_params = self.panel.get_assertion_parameters_from_panel(len(param_labels))
2237 | if raw_params is None:
2238 | print("Error retrieving parameters from panel. Please try again.")
2239 | current_state = "param_input"; continue # Stay here
2240 |
2241 | # Map raw_params (param1, param2) to specific keys
2242 | assertion_params = {}
2243 | try:
2244 | if assertion_action == "assert_text_contains" or assertion_action == "assert_text_equals":
2245 | assertion_params["expected_text"] = raw_params.get("param1", "")
2246 | elif assertion_action == "assert_attribute_equals":
2247 | assertion_params["attribute_name"] = raw_params.get("param1", "")
2248 | assertion_params["expected_value"] = raw_params.get("param2", "")
2249 | if not assertion_params["attribute_name"]: raise ValueError("Attribute name cannot be empty.")
2250 | elif assertion_action == "assert_element_count":
2251 | count_str = raw_params.get("param1", "")
2252 | if not count_str.isdigit(): raise ValueError("Expected count must be a number.")
2253 | assertion_params["expected_count"] = int(count_str)
2254 | print(f"Parameters submitted: {assertion_params}")
2255 | break # Exit loop to record
2256 | except ValueError as ve:
2257 | print(f"Input Error: {ve}. Please correct parameters.")
2258 | current_state = "param_input"; continue # Stay here to retry
2259 |
2260 | elif user_choice == 'back_to_type':
2261 | current_state = "type_selection"; continue # Go back
2262 | elif user_choice == 'abort':
2263 | self._user_abort_recording = True; break # Exit loop, return False
2264 | else: # Includes skip, timeout, None, unexpected values
2265 | print("Skipping assertion definition.")
2266 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion parameters")
2267 | break # Exit loop, return True
2268 |
2269 | else: # Should not happen
2270 | logger.error(f"Assertion state error: Unknown state '{current_state}'. Aborting assertion.")
2271 | self._user_abort_recording = True; break # Exit loop, return False
2272 |
2273 |
2274 | # --- End of State Machine Loop ---
2275 | self.panel.hide_recorder_panel() # Ensure panel is hidden
2276 |
2277 | # --- Record Step if not Aborted/Skipped ---
2278 | if not self._user_abort_recording and assertion_action and final_selector:
2279 | # Check if loop exited normally for recording vs. skipping
2280 | task_status = self.task_manager.subtasks[self.task_manager.current_subtask_index]['status']
2281 | if task_status != "skipped": # Only record if not explicitly skipped
2282 | record = {
2283 | "step_id": self._current_step_id,
2284 | "action": assertion_action,
2285 | "description": planned_desc, # Use original planned description
2286 | "parameters": assertion_params,
2287 | "selector": final_selector,
2288 | "wait_after_secs": 0 # Assertions usually don't need waits after
2289 | }
2290 | self.recorded_steps.append(record)
2291 | self._current_step_id += 1
2292 | logger.info(f"Step {record['step_id']} recorded: {assertion_action} on {final_selector}")
2293 | self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded as assertion step {record['step_id']}")
2294 | return True
2295 | else:
2296 | logger.info("Assertion definition skipped by user.")
2297 | return True # Skipped successfully
2298 |
2299 | elif self._user_abort_recording:
2300 | logger.warning("Assertion definition aborted by user.")
2301 | return False # Aborted
2302 | else:
2303 | # Loop exited without recording (likely due to skip choice)
2304 | logger.info("Assertion definition finished without recording (likely skipped).")
2305 | # Task status should already be 'skipped' from within the loop
2306 | return True
2307 |
2308 |
2309 | def record(self, feature_description: str) -> Dict[str, Any]:
2310 | """
2311 | Runs the interactive test recording process with LLM verification and dynamic re-planning.
2312 | """
2313 | if not self.is_recorder_mode:
2314 | logger.error("Cannot run record() method when not in recorder mode.")
2315 | return {"success": False, "message": "Agent not initialized in recorder mode."}
2316 |
2317 | automation_status = "Automated" if self.automated_mode else "Interactive"
2318 | logger.info(f"--- Starting Test Recording ({automation_status}) --- Feature: {feature_description}")
2319 | if not self.automated_mode:
2320 | print(f"\n--- Starting Recording for Feature ({automation_status}) ---\n{feature_description}\n" + "-"*35)
2321 | start_time = time.time()
2322 | # Initialize recording status
2323 | recording_status = {
2324 | "success": False,
2325 | "feature": feature_description,
2326 | "message": "Recording initiated.",
2327 | "output_file": None,
2328 | "steps_recorded": 0,
2329 | "duration_seconds": 0.0,
2330 | }
2331 | # Reset state for a new recording session
2332 | self.history = []
2333 | self.recorded_steps = []
2334 | self._current_step_id = 1
2335 | self.output_file_path = None
2336 | self._latest_dom_state = None
2337 | self._user_abort_recording = False
2338 | self._consecutive_suggestion_failures = 0
2339 | self._last_failed_step_index = -1
2340 |
2341 | try:
2342 | logger.debug("[RECORDER] Starting browser controller...")
2343 | self.browser_controller.start()
2344 | self.browser_controller.clear_console_messages()
2345 |
2346 | self.task_manager.set_main_task(feature_description)
2347 | logger.debug("[RECORDER] Planning initial steps...")
2348 | self._plan_subtasks(feature_description) # Generates the list of planned steps
2349 |
2350 | if not self.task_manager.subtasks:
2351 | recording_status["message"] = "❌ Recording Planning Failed: No steps generated."
2352 | raise ValueError(recording_status["message"]) # Use ValueError for planning failure
2353 |
2354 | logger.info(f"Beginning interactive recording for {len(self.task_manager.subtasks)} initial planned steps...")
2355 | iteration_count = 0 # General loop counter for safety
2356 | MAX_RECORDING_ITERATIONS = self.max_iterations * 2 # Allow more iterations for potential recovery steps
2357 |
2358 | while iteration_count < MAX_RECORDING_ITERATIONS:
2359 | iteration_count += 1
2360 | planned_steps_count = len(self.task_manager.subtasks) # Get current count
2361 | current_planned_task = self.task_manager.get_next_subtask()
2362 |
2363 | if self._user_abort_recording: # Abort check
2364 | recording_status["message"] = f"Recording aborted by {'user' if not self.automated_mode else 'AI'} because {self._abort_reason if self.automated_mode else 'User had some chores to do'}."
2365 | logger.warning(recording_status["message"])
2366 | break
2367 |
2368 | if not current_planned_task:
2369 | # Check if finished normally or failed planning/retries
2370 | if self.task_manager.is_complete():
2371 | # Check if ANY task failed permanently
2372 | perm_failed_tasks = [t for t in self.task_manager.subtasks if t['status'] == 'failed' and t['attempts'] > self.task_manager.max_retries_per_subtask]
2373 | logger.error(perm_failed_tasks)
2374 | if perm_failed_tasks:
2375 | first_failed_idx = self.task_manager.subtasks.index(perm_failed_tasks[0])
2376 | failed_task = perm_failed_tasks[0]
2377 | recording_status["message"] = f"Recording process completed with failures. First failed step #{first_failed_idx+1}: {failed_task['description']} (Error: {failed_task['result']})"
2378 | recording_status["success"] = False # Mark as failed overall
2379 | logger.error(recording_status["message"])
2380 | elif all(t['status'] in ['done', 'skipped'] for t in self.task_manager.subtasks):
2381 | logger.info("All planned steps processed or skipped successfully.")
2382 | recording_status["message"] = "Recording process completed."
2383 | recording_status["success"] = True # Mark as success ONLY if no permanent failures
2384 | else:
2385 | # Should not happen if is_complete is true and perm_failed is empty
2386 | recording_status["message"] = "Recording finished, but final state inconsistent."
2387 | recording_status["success"] = False
2388 | logger.warning(recording_status["message"])
2389 |
2390 | else:
2391 | recording_status["message"] = "Recording loop ended unexpectedly (no actionable tasks found)."
2392 | recording_status["success"] = False
2393 | logger.error(recording_status["message"])
2394 | break # Exit loop
2395 |
2396 | # Add index to the task dictionary for easier reference
2397 | current_task_index = self.task_manager.current_subtask_index
2398 | current_planned_task['index'] = current_task_index
2399 |
2400 | logger.info(f"\n===== Processing Planned Step {current_task_index + 1}/{planned_steps_count} (Attempt {current_planned_task['attempts']}) =====")
2401 | if not self.automated_mode: print(f"\nProcessing Step {current_task_index + 1}: {current_planned_task['description']}")
2402 |
2403 | # --- Reset Consecutive Failure Counter if step index changes ---
2404 | if self._last_failed_step_index != current_task_index:
2405 | self._consecutive_suggestion_failures = 0
2406 | self._last_failed_step_index = current_task_index # Update last processed index
2407 |
2408 | # --- State Gathering ---
2409 | logger.info("Gathering browser state and structured DOM...")
2410 | current_url = "Error: Could not get URL"
2411 | dom_context_str = "Error: Could not process DOM"
2412 | static_id_map = {}
2413 | screenshot_bytes = None # Initialize screenshot bytes
2414 | self._latest_dom_state = None
2415 | self.browser_controller.clear_highlights() # Clear previous highlights
2416 | self.panel.hide_recorder_panel()
2417 |
2418 | try:
2419 | current_url = self.browser_controller.get_current_url()
2420 | # Always try to get DOM state
2421 | self._latest_dom_state = self.browser_controller.get_structured_dom(highlight_all_clickable_elements=False, viewport_expansion=-1)
2422 | if self._latest_dom_state and self._latest_dom_state.element_tree:
2423 | dom_context_str, static_id_map = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification')
2424 | self._last_static_id_map = static_id_map
2425 | else:
2426 | dom_context_str = "Error processing DOM structure."
2427 | self._last_static_id_map = {}
2428 | logger.error("[RECORDER] Failed to get valid DOM state.")
2429 |
2430 | # Get screenshot, especially useful for verification/re-planning
2431 | screenshot_bytes = self.browser_controller.take_screenshot()
2432 |
2433 | except Exception as e:
2434 | logger.error(f"Failed to gather browser state/DOM/Screenshot: {e}", exc_info=True)
2435 | dom_context_str = f"Error gathering state: {e}"
2436 | # Allow proceeding, LLM might handle navigation or re-planning might trigger
2437 |
2438 | # --- Handle Step Type ---
2439 | planned_step_desc_lower = current_planned_task['description'].lower()
2440 | step_handled_internally = False # Flag to indicate if step logic was fully handled here
2441 |
2442 | # --- 1. Verification Step ---
2443 | if planned_step_desc_lower.startswith(("verify", "assert")):
2444 | previous_error = current_planned_task.get("error") # Get validation error from last attempt
2445 | logger.info("Handling verification step using LLM...")
2446 | verification_result = self._get_llm_verification(
2447 | verification_description=current_planned_task['description'],
2448 | current_url=current_url,
2449 | dom_context_str=dom_context_str,
2450 | static_id_map=static_id_map,
2451 | screenshot_bytes=screenshot_bytes,
2452 | previous_error=previous_error
2453 | )
2454 | if verification_result:
2455 | # Handle user confirmation & recording based on LLM result
2456 | handled_ok = self._handle_llm_verification(current_planned_task, verification_result)
2457 | if not handled_ok and self._user_abort_recording:
2458 | logger.warning("User aborted during verification handling.")
2459 | break
2460 | else:
2461 | # LLM verification failed, fallback to manual
2462 | failure_reason = "LLM call/parse failed for verification."
2463 | logger.error(f"[Verification] {failure_reason}")
2464 | if self.automated_mode:
2465 | logger.error("[Auto Mode] LLM verification call failed. Skipping step.")
2466 | self.task_manager.update_subtask_status(current_task_index, "skipped", result="Skipped (LLM verification failed)")
2467 | else:
2468 | print("AI verification failed. Falling back to manual assertion definition.")
2469 | if not self._handle_assertion_recording(current_planned_task): # Manual handler
2470 | self._user_abort_recording = True
2471 | step_handled_internally = True # Verification is fully handled here or in called methods
2472 |
2473 | # --- 2. Navigation Step ---
2474 | elif planned_step_desc_lower.startswith("navigate to"):
2475 | try:
2476 | parts = re.split("navigate to", current_planned_task['description'], maxsplit=1, flags=re.IGNORECASE)
2477 | if len(parts) > 1 and parts[1].strip():
2478 | url = parts[1].strip()
2479 | # print(f"Action: Navigate to {url}")
2480 | exec_result = self._execute_action_for_recording("navigate", None, {"url": url})
2481 | if exec_result["success"]:
2482 | # Record navigation step + implicit wait
2483 | nav_step_id = self._current_step_id
2484 | self.recorded_steps.append({
2485 | "step_id": nav_step_id, "action": "navigate", "description": current_planned_task['description'],
2486 | "parameters": {"url": url}, "selector": None, "wait_after_secs": 0 # Wait handled by wait_for_load_state
2487 | })
2488 | self._current_step_id += 1
2489 | self.recorded_steps.append({ # Add implicit wait
2490 | "step_id": self._current_step_id, "action": "wait_for_load_state", "description": "Wait for page navigation",
2491 | "parameters": {"state": "domcontentloaded"}, "selector": None, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION
2492 | })
2493 | self._current_step_id += 1
2494 | logger.info(f"Steps {nav_step_id}, {self._current_step_id-1} recorded: navigate and wait")
2495 | self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded navigation")
2496 | self._consecutive_suggestion_failures = 0 # Reset failure counter on success
2497 | else:
2498 | # NAVIGATION FAILED - Potential trigger for re-planning
2499 | logger.error(f"Navigation failed: {exec_result['message']}")
2500 | reason = f"Navigation to '{url}' failed: {exec_result['message']}"
2501 | # Try re-planning instead of immediate skip/abort
2502 | if self._trigger_re_planning(current_planned_task, reason):
2503 | logger.info("Re-planning successful, continuing with recovery steps.")
2504 | # Recovery steps inserted, loop will pick them up
2505 | else:
2506 | # Re-planning failed or user aborted/skipped recovery
2507 | if not self._user_abort_recording: # Check if abort wasn't the reason
2508 | logger.warning("Re-planning failed or declined after navigation failure. Skipping original step.")
2509 | # Status already updated by _trigger_re_planning if skipped/aborted
2510 | else:
2511 | raise ValueError("Could not parse URL after 'navigate to'.")
2512 | except Exception as nav_e:
2513 | logger.error(f"Error processing navigation step '{current_planned_task['description']}': {nav_e}")
2514 | reason = f"Error processing navigation step: {nav_e}"
2515 | if self._trigger_re_planning(current_planned_task, reason):
2516 | logger.info("Re-planning successful after navigation processing error.")
2517 | else:
2518 | if not self._user_abort_recording:
2519 | logger.warning("Re-planning failed/declined. Marking original navigation step failed.")
2520 | self.task_manager.update_subtask_status(current_task_index, "failed", error=reason) # Mark as failed if no recovery
2521 |
2522 | step_handled_internally = True # Navigation handled
2523 |
2524 | # --- 3. Scroll Step ---
2525 | elif planned_step_desc_lower.startswith("scroll"):
2526 | try:
2527 | direction = "down" if "down" in planned_step_desc_lower else "up" if "up" in planned_step_desc_lower else None
2528 | if direction:
2529 | exec_result = self._execute_action_for_recording("scroll", None, {"direction": direction})
2530 | if exec_result["success"]:
2531 | self.recorded_steps.append({
2532 | "step_id": self._current_step_id, "action": "scroll", "description": current_planned_task['description'],
2533 | "parameters": {"direction": direction}, "selector": None, "wait_after_secs": 0.2
2534 | })
2535 | self._current_step_id += 1
2536 | logger.info(f"Step {self._current_step_id-1} recorded: scroll {direction}")
2537 | self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded scroll")
2538 | self._consecutive_suggestion_failures = 0 # Reset failure counter
2539 | else:
2540 | self.task_manager.update_subtask_status(current_task_index, "skipped", result="Optional scroll failed")
2541 | else:
2542 | self.task_manager.update_subtask_status(current_task_index, "skipped", result="Unknown scroll direction")
2543 | except Exception as scroll_e:
2544 | logger.error(f"Error handling scroll step: {scroll_e}")
2545 | self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Scroll step failed: {scroll_e}") # Mark failed
2546 | step_handled_internally = True # Scroll handled
2547 |
2548 | # -- 4. Handle Visual Baseline Capture Step ---
2549 | elif planned_step_desc_lower.startswith("visually baseline"):
2550 | planned_desc = current_planned_task['description']
2551 | logger.info(f"Handling planned step: '{planned_desc}'")
2552 |
2553 | target_description = planned_step_desc_lower.replace("visually baseline the", "").strip()
2554 | default_baseline_id = re.sub(r'\s+', '_', target_description) # Generate default ID
2555 | default_baseline_id = re.sub(r'[^\w\-]+', '', default_baseline_id)[:50] # Sanitize
2556 |
2557 | baseline_id = None
2558 | target_selector = None
2559 | capture_type = 'page' # Default to full page
2560 |
2561 | # --- Mode-dependent handling ---
2562 | if self.automated_mode:
2563 | print = lambda *args, **kwargs: logger.info(f"[Auto Mode Baseline] {' '.join(map(str, args))}")
2564 | baseline_id = default_baseline_id or f"baseline_{self._current_step_id}" # Ensure ID exists
2565 | print(f"Capturing baseline: '{baseline_id}' (Full Page - Default)")
2566 | # Note: Automated mode currently only supports full page baselines.
2567 | # To support element baselines, we'd need AI to suggest selector or pre-define targets.
2568 |
2569 | else: # Interactive Mode
2570 | print("\n" + "="*60)
2571 | print(f"Planned Step: {planned_desc}")
2572 | baseline_id = input(f"Enter Baseline ID (default: '{default_baseline_id}'): ").strip() or default_baseline_id
2573 | capture_choice = input("Capture Full Page (P) or Specific Element (E)? [P]: ").strip().lower()
2574 |
2575 | if capture_choice == 'e':
2576 | capture_type = 'element'
2577 | print("Click the element to capture as baseline...")
2578 | self.browser_controller.clear_highlights() # Clear any previous
2579 | listener_setup = self.browser_controller.setup_click_listener()
2580 | if listener_setup:
2581 | try:
2582 | target_selector = self.browser_controller.wait_for_user_click_or_timeout(20.0)
2583 | if target_selector:
2584 | print(f"Element selected. Using selector: {target_selector}")
2585 | # Highlight the selected element briefly
2586 | try:
2587 | self.browser_controller.highlight_element(target_selector, 0, color="#00FF00", text="Baseline Element")
2588 | time.sleep(1.5) # Show highlight briefly
2589 | except: pass # Ignore highlight errors
2590 | else:
2591 | print("No element selected (timeout). Defaulting to Full Page.")
2592 | capture_type = 'page'
2593 | except Exception as e:
2594 | logger.error(f"Error during element selection for baseline: {e}")
2595 | print("Error selecting element. Defaulting to Full Page.")
2596 | capture_type = 'page'
2597 | self.browser_controller.remove_click_listener() # Clean up listener
2598 | else:
2599 | print("Error setting up click listener. Defaulting to Full Page.")
2600 | capture_type = 'page'
2601 | else: # Default to Page
2602 | print("Capturing Full Page baseline.")
2603 | capture_type = 'page'
2604 |
2605 | # --- Capture and Save Baseline ---
2606 | capture_success = False
2607 | final_screenshot_bytes = None
2608 | if capture_type == 'element' and target_selector:
2609 | final_screenshot_bytes = self.browser_controller.take_screenshot_element(target_selector)
2610 | if final_screenshot_bytes:
2611 | capture_success = self._save_visual_baseline(baseline_id, final_screenshot_bytes, selector=target_selector)
2612 | else:
2613 | logger.error(f"Failed to capture element screenshot for baseline '{baseline_id}' selector '{target_selector}'.")
2614 | if not self.automated_mode: print("Error: Failed to capture element screenshot.")
2615 | else: # Full page
2616 | # Use the screenshot already taken during state gathering if available
2617 | final_screenshot_bytes = screenshot_bytes
2618 | if final_screenshot_bytes:
2619 | capture_success = self._save_visual_baseline(baseline_id, final_screenshot_bytes, selector=None)
2620 | else:
2621 | logger.error(f"Failed to capture full page screenshot for baseline '{baseline_id}'.")
2622 | if not self.automated_mode: print("Error: Failed to capture full page screenshot.")
2623 |
2624 | # --- Record assert_visual_match step ---
2625 | if capture_success:
2626 | record = {
2627 | "step_id": self._current_step_id,
2628 | "action": "assert_visual_match", # The corresponding execution action
2629 | "description": planned_desc, # Use the baseline description
2630 | "parameters": {"baseline_id": baseline_id},
2631 | "selector": target_selector, # Null for page, selector for element
2632 | "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION
2633 | }
2634 | self.recorded_steps.append(record)
2635 | self._current_step_id += 1
2636 | logger.info(f"Step {record['step_id']} recorded: assert_visual_match for baseline '{baseline_id}' ({'Element' if target_selector else 'Page'})")
2637 | self.task_manager.update_subtask_status(current_task_index, "done", result=f"Recorded baseline '{baseline_id}'")
2638 | self._consecutive_suggestion_failures = 0
2639 | else:
2640 | # Baseline capture/save failed
2641 | logger.error(f"Failed to save baseline '{baseline_id}'. Skipping recording.")
2642 | if not self.automated_mode: print(f"Failed to save baseline '{baseline_id}'. Skipping.")
2643 | self.task_manager.update_subtask_status(current_task_index, "skipped", result="Failed to save baseline")
2644 |
2645 | step_handled_internally = True # Baseline capture handled
2646 |
2647 | # --- 5. Wait Step ---
2648 | elif planned_step_desc_lower.startswith("wait for"):
2649 | logger.info(f"Handling planned wait step: {current_planned_task['description']}")
2650 | wait_params = {}
2651 | wait_selector = None
2652 | wait_desc = current_planned_task['description']
2653 | parsed_ok = False # Flag to check if parameters were parsed
2654 | try:
2655 | # Try to parse common patterns
2656 | time_match = re.search(r"wait for (\d+(\.\d+)?)\s+seconds?", planned_step_desc_lower)
2657 | # Updated regex to be more flexible with optional 'element' word and quotes
2658 | element_match = re.search(r"wait for (?:element\s*)?\'?(.*?)\'?\s+to be\s+(\w+)", planned_step_desc_lower)
2659 | url_match = re.search(r"wait for url\s*\'?(.*?)\'?", planned_step_desc_lower)
2660 |
2661 | if time_match:
2662 | wait_params["timeout_seconds"] = float(time_match.group(1))
2663 | wait_action = "wait"
2664 | parsed_ok = True
2665 | elif element_match:
2666 | element_desc_for_selector = element_match.group(1).strip()
2667 | state = element_match.group(2).strip()
2668 | wait_params["state"] = state
2669 | # --- Attempt to resolve selector during recording ---
2670 | # For simplicity, let's *assume* the description IS the selector for now.
2671 | # A better approach would use LLM or prompt user.
2672 | if element_desc_for_selector.startswith(('#', '.', '[')) or '/' in element_desc_for_selector:
2673 | wait_selector = element_desc_for_selector
2674 | logger.info(f"Using description '{wait_selector}' directly as selector for wait.")
2675 | else:
2676 | logger.warning(f"Cannot directly use '{element_desc_for_selector}' as selector. Wait step might fail execution. Recording intent.")
2677 | # Record without selector, executor might fail unless enhanced
2678 | wait_desc += f" (Element Description: {element_desc_for_selector})" # Add detail to desc
2679 |
2680 | wait_params["selector"] = wait_selector # Store selector (or None) in params for execution call
2681 | wait_action = "wait" # Still use generic wait
2682 | parsed_ok = True
2683 | elif url_match:
2684 | wait_params["url"] = url_match.group(1) # URL pattern
2685 | wait_action = "wait" # Use generic wait
2686 | parsed_ok = True
2687 | else:
2688 | logger.warning(f"Could not parse wait parameters from: '{current_planned_task['description']}'. Skipping.")
2689 | self.task_manager.update_subtask_status(current_task_index, "skipped", result="Unknown wait format")
2690 | wait_action = None
2691 | parsed_ok = False
2692 |
2693 | if parsed_ok and wait_action:
2694 | # --- Execute the wait ---
2695 | logger.info(f"Executing wait action: {wait_params}")
2696 | wait_exec_result = self.browser_controller.wait(**wait_params)
2697 |
2698 | if wait_exec_result["success"]:
2699 | logger.info("Wait execution successful during recording.")
2700 | # --- Record the step AFTER successful execution ---
2701 | self.recorded_steps.append({
2702 | "step_id": self._current_step_id, "action": wait_action, "description": wait_desc,
2703 | "parameters": wait_params, # Use the parsed params
2704 | "selector": wait_selector, # Record selector if found
2705 | "wait_after_secs": 0
2706 | })
2707 | self._current_step_id += 1
2708 | logger.info(f"Step {self._current_step_id-1} recorded: {wait_action} with params {wait_params}")
2709 | self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded and executed wait step")
2710 | else:
2711 | # Wait failed during recording
2712 | logger.error(f"Wait execution FAILED during recording: {wait_exec_result['message']}")
2713 | # Decide how to handle: Skip? Fail? Abort? Let's skip for now.
2714 | if not self.automated_mode:
2715 | cont = input("Wait failed. Skip this step (S) or Abort recording (A)? [S]: ").strip().lower()
2716 | if cont == 'a':
2717 | self._user_abort_recording = True
2718 | self._abort_reason = "User aborted after wait failed."
2719 | else:
2720 | self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Wait failed during recording: {wait_exec_result['message']}")
2721 | else: # Automated mode - just skip
2722 | self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Wait failed during recording: {wait_exec_result['message']}")
2723 |
2724 |
2725 | except Exception as wait_e:
2726 | logger.error(f"Error parsing or executing wait step: {wait_e}")
2727 | # Mark as failed if parsing/execution error occurred
2728 | self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Wait step processing failed: {wait_e}")
2729 |
2730 | step_handled_internally = True
2731 |
2732 |
2733 | # --- 4. Default: Assume Interactive Click/Type ---
2734 | if not step_handled_internally:
2735 | # --- AI Suggestion ---
2736 | logger.critical(dom_context_str)
2737 | ai_suggestion = self._determine_action_and_selector_for_recording(
2738 | current_planned_task, current_url, dom_context_str
2739 | )
2740 |
2741 | # --- Handle Suggestion Result ---
2742 | if not ai_suggestion or ai_suggestion.get("action") == "suggestion_failed":
2743 | reason = ai_suggestion.get("reasoning", "LLM failed to provide valid suggestion.") if ai_suggestion else "LLM suggestion generation failed."
2744 | logger.error(f"AI suggestion failed for step {current_task_index + 1}: {reason}")
2745 | self._consecutive_suggestion_failures += 1
2746 | # Check if we should try re-planning due to repeated failures
2747 | if self._consecutive_suggestion_failures > self.task_manager.max_retries_per_subtask:
2748 | logger.warning(f"Maximum suggestion retries exceeded for step {current_task_index + 1}. Triggering re-planning.")
2749 | replan_reason = f"AI failed to suggest an action/selector repeatedly for step: '{current_planned_task['description']}'. Last reason: {reason}"
2750 | if self._trigger_re_planning(current_planned_task, replan_reason):
2751 | logger.info("Re-planning successful after suggestion failures.")
2752 | # Loop continues with recovery steps
2753 | else:
2754 | # Re-planning failed or user aborted/skipped
2755 | if not self._user_abort_recording:
2756 | logger.error("Re-planning failed/declined. Marking original step as failed permanently.")
2757 | self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Failed permanently after repeated suggestion errors and failed re-planning attempt. Last reason: {reason}", force_update=True)
2758 | else:
2759 | # Mark as failed for normal retry by TaskManager
2760 | self.task_manager.update_subtask_status(current_task_index, "failed", error=reason)
2761 | # Continue loop, TaskManager will offer retry if possible
2762 |
2763 | elif ai_suggestion.get("action") == "action_not_applicable":
2764 | reason = ai_suggestion.get("reasoning", "Step not a click/type.")
2765 | logger.info(f"Planned step '{current_planned_task['description']}' determined not applicable by AI. Skipping. Reason: {reason}")
2766 | # Could this trigger re-planning? Maybe if it happens unexpectedly. For now, treat as skip.
2767 | self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Skipped non-interactive step ({reason})")
2768 | self._consecutive_suggestion_failures = 0 # Reset counter on skip
2769 |
2770 | elif ai_suggestion.get("action") in ["click", "type", "check", "uncheck", "select", "key_press", "drag_and_drop"]:
2771 | # --- Handle Interactive Step (Confirmation/Override/Execution) ---
2772 | # This method now returns True if handled (recorded, skipped, retry requested), False if aborted
2773 | # It also internally updates task status based on outcome.
2774 | handled_ok = self._handle_interactive_step_recording(current_planned_task, ai_suggestion)
2775 |
2776 | if not handled_ok and self._user_abort_recording:
2777 | logger.warning("User aborted during interactive step handling.")
2778 | break # Exit main loop immediately on abort
2779 |
2780 | # Check if the step failed execution and might need re-planning
2781 | current_task_status = self.task_manager.subtasks[current_task_index]['status']
2782 | if current_task_status == 'failed':
2783 | # _handle_interactive_step_recording marks failed if execution fails and user doesn't skip/abort
2784 | # Check if it was an execution failure (not just suggestion retry)
2785 | error_msg = self.task_manager.subtasks[current_task_index].get('error', '')
2786 | if "Execution failed" in error_msg: # Check for execution failure messages
2787 | logger.warning(f"Execution failed for step {current_task_index + 1}. Triggering re-planning.")
2788 | replan_reason = f"Action execution failed for step '{current_planned_task['description']}'. Error: {error_msg}"
2789 | if self._trigger_re_planning(current_planned_task, replan_reason):
2790 | logger.info("Re-planning successful after execution failure.")
2791 | # Loop continues with recovery steps
2792 | else:
2793 | # Re-planning failed or declined
2794 | if not self._user_abort_recording:
2795 | logger.error("Re-planning failed/declined after execution error. Step remains failed.")
2796 | # Task already marked as failed by _handle_interactive_step_recording
2797 | # else: It was likely marked failed to retry suggestion - allow normal retry flow
2798 |
2799 | elif current_task_status == 'done' or current_task_status == 'skipped':
2800 | self._consecutive_suggestion_failures = 0 # Reset failure counter on success/skip
2801 |
2802 | else: # Should not happen
2803 | logger.error(f"Unexpected AI suggestion action: {ai_suggestion.get('action')}. Skipping step.")
2804 | self.task_manager.update_subtask_status(current_task_index, "failed", error="Unexpected AI action suggestion")
2805 |
2806 |
2807 | # --- Cleanup after processing a step attempt ---
2808 | self.browser_controller.clear_highlights()
2809 | # Listener removal is handled within _handle_interactive_step_recording and wait_for_user_click...
2810 | # self.browser_controller.remove_click_listener() # Ensure listener is off - redundant?
2811 |
2812 | # Small delay between steps/attempts
2813 | if not self._user_abort_recording: # Don't delay if aborting
2814 | time.sleep(0.3)
2815 |
2816 |
2817 | # --- Loop End ---
2818 | if not recording_status["success"] and iteration_count >= MAX_RECORDING_ITERATIONS:
2819 | recording_status["message"] = f"⚠️ Recording Stopped: Maximum iterations ({MAX_RECORDING_ITERATIONS}) reached."
2820 | recording_status["success"] = False # Ensure max iterations means failure
2821 | logger.warning(recording_status["message"])
2822 |
2823 | # --- Final Save ---
2824 | if not self._user_abort_recording and self.recorded_steps:
2825 | try:
2826 | if recording_status.get("success", False): # Only check if currently marked as success
2827 | perm_failed_tasks_final = [t for t in self.task_manager.subtasks if t['status'] == 'failed' and t['attempts'] > self.task_manager.max_retries_per_subtask]
2828 | if perm_failed_tasks_final:
2829 | recording_status["success"] = False # Override success if any task failed
2830 | recording_status["message"] = recording_status["message"].replace("completed.", "completed with failures.") # Adjust message
2831 | logger.warning("Overriding overall success status to False due to permanently failed steps found.")
2832 | output_data = {
2833 | "test_name": f"{feature_description[:50]}_Test",
2834 | "feature_description": feature_description,
2835 | "recorded_at": datetime.utcnow().isoformat() + "Z",
2836 | "console_logs": self.browser_controller.console_messages,
2837 | "steps": self.recorded_steps
2838 | }
2839 | recording_status["console_messages"] = self.browser_controller.console_messages
2840 | ts = time.strftime("%Y%m%d_%H%M%S")
2841 | safe_feature_name = re.sub(r'[^\w\-]+', '_', feature_description)[:50]
2842 | if self.file_name is None:
2843 | self.file_name = f"test_{safe_feature_name}_{ts}.json"
2844 | else:
2845 | self.file_name = self.file_name+f"{safe_feature_name}_{ts}_test.json"
2846 | output_dir = "output"
2847 | if not os.path.exists(output_dir):
2848 | os.makedirs(output_dir)
2849 | self.output_file_path = os.path.join(output_dir, self.file_name)
2850 |
2851 | with open(self.output_file_path, 'w', encoding='utf-8') as f:
2852 | json.dump(output_data, f, indent=2, ensure_ascii=False)
2853 |
2854 | recording_status["output_file"] = self.output_file_path
2855 | recording_status["steps_recorded"] = len(self.recorded_steps)
2856 | # Set success only if we saved something and didn't explicitly fail/abort
2857 | if recording_status["success"]:
2858 | logger.info(f"Recording successfully saved to: {self.output_file_path}")
2859 | else:
2860 | logger.warning(f"Recording finished with status: {'Failed' if not self._user_abort_recording else 'Aborted'}. Saved {len(self.recorded_steps)} steps to: {self.output_file_path}. Message: {recording_status.get('message')}")
2861 |
2862 |
2863 | except Exception as save_e:
2864 | logger.error(f"Failed to save recorded steps to JSON: {save_e}", exc_info=True)
2865 | recording_status["message"] = f"Failed to save recording: {save_e}"
2866 | recording_status["success"] = False
2867 | elif self._user_abort_recording:
2868 | if not self._abort_reason:
2869 | recording_status["message"] = "Recording aborted by user. No file saved."
2870 | recording_status["success"] = False
2871 | else: # No steps recorded
2872 | recording_status["message"] = "No steps were recorded."
2873 | recording_status["success"] = False
2874 |
2875 | except ValueError as e: # Catch planning errors specifically
2876 | logger.critical(f"Test planning failed: {e}", exc_info=True)
2877 | recording_status["message"] = f"❌ Test Planning Failed: {e}"
2878 | recording_status["success"] = False # Ensure failure state
2879 | except Exception as e:
2880 | logger.critical(f"An critical unexpected error occurred during recording: {e}", exc_info=True)
2881 | recording_status["message"] = f"❌ Critical Error during recording: {e}"
2882 | recording_status["success"] = False # Ensure failure state
2883 | finally:
2884 | logger.info("--- Ending Test Recording ---")
2885 | # Ensure cleanup even if browser wasn't started fully
2886 | if hasattr(self, 'browser_controller') and self.browser_controller:
2887 | self.browser_controller.clear_highlights()
2888 | self.browser_controller.remove_click_listener() # Attempt removal
2889 | self.panel.remove_recorder_panel()
2890 | self.browser_controller.close()
2891 |
2892 | end_time = time.time()
2893 | recording_status["duration_seconds"] = round(end_time - start_time, 2)
2894 | logger.info(f"Recording process finished in {recording_status['duration_seconds']:.2f} seconds.")
2895 | logger.info(f"Final Recording Status: {'Success' if recording_status['success'] else 'Failed/Aborted'} - {recording_status['message']}")
2896 | if recording_status.get("output_file"):
2897 | logger.info(f"Output file: {recording_status.get('output_file')}")
2898 |
2899 | return recording_status # Return the detailed status dictionary
2900 |
```