diff --git a/config/config.yaml.example b/config/config.yaml.example index 2548ff1..ede6f85 100644 --- a/config/config.yaml.example +++ b/config/config.yaml.example @@ -103,10 +103,12 @@ browser_config: headless: False # Docker environment will automatically override to True language: zh-CN cookies: [] - save_screenshots: False # Whether to save screenshots to local disk (default: False) report: language: en-US # zh-CN, en-US + save_screenshots: False # Whether to enable screenshots. If True, screenshots are saved as local files. If False, screenshots are stored as base64 in the test data. + report_dir: null # null=use default 'reports/{timestamp}/', or custom path like './my_reports/' + log: level: info diff --git a/config/config_run.yaml.example b/config/config_run.yaml.example index 74e7190..fc6b65e 100644 --- a/config/config_run.yaml.example +++ b/config/config_run.yaml.example @@ -53,3 +53,5 @@ log: report: language: en-US # zh-CN, en-US + save_screenshots: False # Whether to enable screenshots. If True, screenshots are saved as local files. If False, screenshots are stored as base64 in the test data. + report_dir: null # null=use default 'reports/{timestamp}/', or custom path like './my_reports/' diff --git a/package.json b/package.json index 0565064..cd78b11 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "webqa-agent", - "version": "0.2.2.post1", + "version": "0.2.3", "description": "WebQA Agent is an autonomous web browser agent that audits performance, functionality & UX for engineers and vibe-coding creators.", "dependencies": { "chrome-launcher": "^1.2.0", diff --git a/pyproject.toml b/pyproject.toml index f73f191..7560860 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "webqa-agent" -version = "0.2.2.post1" +version = "0.2.3" description = "WebQA Agent is an autonomous web browser agent that audits performance, functionality & UX for engineers and vibe-coding creators." readme = "README.md" requires-python = ">=3.11" diff --git a/webqa_agent/__init__.py b/webqa_agent/__init__.py index 67c3d98..742c24f 100644 --- a/webqa_agent/__init__.py +++ b/webqa_agent/__init__.py @@ -1,4 +1,4 @@ __all__ = [ ] -__version__ = '0.2.2.post1' +__version__ = '0.2.3' diff --git a/webqa_agent/actions/action_handler.py b/webqa_agent/actions/action_handler.py index fee19f1..f68f39a 100644 --- a/webqa_agent/actions/action_handler.py +++ b/webqa_agent/actions/action_handler.py @@ -4,6 +4,7 @@ import json import logging import os +import random import re from contextvars import ContextVar from dataclasses import dataclass, field @@ -15,6 +16,7 @@ # ===== Action Context Infrastructure for Error Propagation ===== action_context_var: ContextVar[Optional['ActionContext']] = ContextVar('action_context', default=None) +screenshot_prefix_var: ContextVar[str] = ContextVar('screenshot_prefix', default='') @dataclass @@ -69,35 +71,68 @@ class ActionHandler: # Session management for screenshot organization _screenshot_session_dir: Optional[Path] = None _screenshot_session_timestamp: Optional[str] = None - _save_screenshots: bool = False # Default: not save screenshots to disk + _save_screenshots_locally: bool = False # Whether to save screenshots as files @classmethod - def set_screenshot_config(cls, save_screenshots: bool = False): - """Set global screenshot saving behavior. + def clear_screenshot_session(cls): + """Clear the current screenshot session state. + + This should be called at the start of a new test session to ensure + isolation from previous runs in the same process. + """ + cls._screenshot_session_dir = None + cls._screenshot_session_timestamp = None + logging.debug('Screenshot session state cleared') + + @classmethod + def set_screenshot_config(cls, save_screenshots: bool): + """Set whether to save screenshots locally. Args: - save_screenshots: Whether to save screenshots to local disk (default: False) + save_screenshots: If True, screenshots are saved as files. + If False, only base64 data is kept. """ - cls._save_screenshots = save_screenshots - logging.debug(f'Screenshot saving config set to: {save_screenshots}') + cls._save_screenshots_locally = save_screenshots + logging.info(f'Screenshot configuration updated: save_locally={save_screenshots}') @classmethod - def init_screenshot_session(cls) -> Path: + def init_screenshot_session(cls, custom_report_dir: Optional[str] = None) -> Path: """Initialize screenshot session directory for this test run. - Creates a timestamped directory under webqa_agent/crawler/screenshots/ - for organizing all screenshots from a single test session. + Creates a screenshot directory under the report directory. All screenshots + from the test session will be organized in this directory. + + Args: + custom_report_dir: Custom report directory from config. + If None, uses default 'reports/{timestamp}/' Returns: - Path: The session directory path + Path: The screenshot directory path + + Examples: + >>> # Default directory + >>> ActionHandler.init_screenshot_session() + PosixPath('reports/20260105_155547/screenshots') + + >>> # Custom directory + >>> ActionHandler.init_screenshot_session('./my_reports') + PosixPath('my_reports/screenshots') """ - if cls._screenshot_session_dir is None: - timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') - base_dir = Path(__file__).parent.parent / 'crawler' / 'screenshots' - cls._screenshot_session_dir = base_dir / timestamp + if cls._save_screenshots_locally and cls._screenshot_session_dir is None: + timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') or datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f') cls._screenshot_session_timestamp = timestamp + + if custom_report_dir: + # User-defined directory: {custom_report_dir}/screenshots/ + report_base = Path(custom_report_dir) + else: + # Default directory: reports/test_{timestamp}/ + report_base = Path('reports') / f'test_{timestamp}' + + cls._screenshot_session_dir = report_base / 'screenshots' cls._screenshot_session_dir.mkdir(parents=True, exist_ok=True) - logging.info(f'Initialized screenshot session directory: {cls._screenshot_session_dir}') + logging.info(f'Initialized screenshot directory: {cls._screenshot_session_dir}') + return cls._screenshot_session_dir def __init__(self): @@ -1651,9 +1686,8 @@ async def b64_page_screenshot( full_page: bool = False, file_name: Optional[str] = None, context: str = 'default' - ) -> Optional[str]: - """Get page screenshot (Base64 encoded) and optionally save to local - file. + ) -> tuple[Optional[str], Optional[str]]: + """Get page screenshot as base64 and save to local file. Args: full_page: whether to capture the whole page @@ -1661,35 +1695,49 @@ async def b64_page_screenshot( context: test context category (e.g., 'test', 'agent', 'scroll', 'error') Returns: - str: screenshot base64 encoded, or None if screenshot fails + tuple[Optional[str], Optional[str]]: (base64_data, file_path) + - base64_data: For LLM requests (data:image/png;base64,...) + - file_path: Relative path to saved screenshot file, or None if not saved Note: - The screenshot is always returned as base64 for HTML reports and LLM analysis. - Local file saving is controlled by the _save_screenshots class variable. + Screenshots are saved to local disk if _save_screenshots_locally is True. + The base64 data is always returned for LLM analysis and fallback rendering. """ try: # Get current active page (dynamically resolves to latest page) current_page = self._get_current_page() timeout = 90000 if full_page else 60000 # 90s for full page, 60s for viewport - # Prepare file path only if saving is enabled file_path_str = None - if self._save_screenshots: - # Initialize session directory if needed + relative_path = None + + # Only prepare file path and session if local saving is enabled + if self._save_screenshots_locally: session_dir = self.init_screenshot_session() # Generate timestamp and filename - timestamp = datetime.datetime.now().strftime('%H%M%S') + # Use high-precision timestamp and random suffix to avoid collisions in parallel execution + now = datetime.datetime.now() + timestamp = now.strftime('%H%M%S_%f') + random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=4)) + + # Get prefix from context variable (set by workers to distinguish cases/tests) + prefix = screenshot_prefix_var.get() + prefix_part = f'{prefix}_' if prefix else '' - # Build filename: {timestamp}_{context}_{file_name}.png if file_name: - filename = f'{timestamp}_{context}_{file_name}.png' + filename = f'{prefix_part}{timestamp}_{random_suffix}_{context}_{file_name}.png' else: - filename = f'{timestamp}_{context}_screenshot.png' + filename = f'{prefix_part}{timestamp}_{random_suffix}_{context}_screenshot.png' - file_path_str = str(session_dir / filename) + file_path = session_dir / filename + file_path_str = str(file_path) - # Capture screenshot (with or without file saving based on config) + # Return path relative to the report root for HTML rendering + # Screenshots are stored in report_dir/screenshots/ and report is in report_dir/run_report.html + relative_path = os.path.join(session_dir.name, filename) + + # Capture screenshot (always returns bytes) screenshot_bytes = await self.take_screenshot( current_page, full_page=full_page, @@ -1697,20 +1745,20 @@ async def b64_page_screenshot( timeout=timeout ) - # Convert to Base64 for HTML reports + # Convert to Base64 for LLM requests screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8') base64_data = f'data:image/png;base64,{screenshot_base64}' - if self._save_screenshots and file_path_str: - logging.debug(f'Screenshot saved to {file_path_str}') + if self._save_screenshots_locally: + logging.debug(f'Screenshot saved: {file_path_str}, relative_path: {relative_path}') else: - logging.debug('Screenshot captured (not saved to disk)') + logging.debug('Screenshot captured as base64 only (local saving disabled)') - return base64_data + return base64_data, relative_path except Exception as e: logging.warning(f'Failed to capture screenshot: {e}') - return None + return None, None async def take_screenshot( self, @@ -1719,21 +1767,20 @@ async def take_screenshot( file_path: str | None = None, timeout: float = 120000, ) -> bytes: - """Get page screenshot (binary) + """Get page screenshot (binary) and save to disk. Args: page: page object full_page: whether to capture the whole page - file_path: screenshot save path (only used when save_screenshots=True) + file_path: screenshot save path (required) timeout: timeout (milliseconds) Returns: bytes: screenshot binary data Note: - If save_screenshots is False, the screenshot will not be saved to disk - regardless of the file_path parameter. The method always returns the - screenshot bytes for in-memory use (e.g., Base64 encoding). + Screenshots are always saved to the file_path if provided. + The method always returns the screenshot bytes for base64 encoding. """ try: # Shortened and more lenient load state check @@ -1743,7 +1790,7 @@ async def take_screenshot( except Exception as e: logging.debug(f'Load state check: {e}; proceeding with screenshot') - logging.debug(f'Taking screenshot (full_page={full_page}, save={self._save_screenshots}, timeout={timeout}ms)') + logging.debug(f'Taking screenshot (full_page={full_page}, timeout={timeout}ms)') # Prepare screenshot options with Playwright best practices screenshot_options = { @@ -1753,12 +1800,10 @@ async def take_screenshot( 'caret': 'hide', # Hide text input cursor for cleaner screenshots } - # Only save to disk if _save_screenshots is True and file_path is provided - if self._save_screenshots and file_path: + # Save to disk if file_path is provided + if file_path: screenshot_options['path'] = file_path logging.debug(f'Screenshot will be saved to: {file_path}') - elif not self._save_screenshots: - logging.debug('Screenshot saving disabled, returning bytes only') # Capture screenshot with optimized options screenshot: bytes = await page.screenshot(**screenshot_options) diff --git a/webqa_agent/actions/click_handler.py b/webqa_agent/actions/click_handler.py index 5727b82..c19554f 100644 --- a/webqa_agent/actions/click_handler.py +++ b/webqa_agent/actions/click_handler.py @@ -88,7 +88,9 @@ async def click_and_screenshot( 'response_errors': [], 'screenshot_before': None, 'screenshot_after': None, + 'screenshot_after_path': None, 'new_page_screenshot': None, + 'new_page_screenshot_path': None, 'click_method': None, 'click_coordinates': None, 'has_new_page': False, @@ -122,11 +124,12 @@ def handle_new_page(page_obj): new_page_action_handler = ActionHandler() new_page_action_handler.page = new_page - screenshot_b64 = await new_page_action_handler.b64_page_screenshot( + screenshot_b64, screenshot_path = await new_page_action_handler.b64_page_screenshot( file_name=f'element_{element_index}_new_page', context='test' ) click_result['new_page_screenshot'] = screenshot_b64 + click_result['new_page_screenshot_path'] = screenshot_path logging.debug('New page screenshot saved') except Exception as e: @@ -135,11 +138,12 @@ def handle_new_page(page_obj): await page.wait_for_load_state('networkidle', timeout=30000) else: - screenshot_b64 = await action_handler.b64_page_screenshot( + screenshot_b64, screenshot_path = await action_handler.b64_page_screenshot( file_name=f'element_{element_index}_after_click', context='test' ) click_result['screenshot_after'] = screenshot_b64 + click_result['screenshot_after_path'] = screenshot_path logging.debug('After click screenshot saved') else: diff --git a/webqa_agent/actions/scroll_handler.py b/webqa_agent/actions/scroll_handler.py index fae37b6..944ccbf 100644 --- a/webqa_agent/actions/scroll_handler.py +++ b/webqa_agent/actions/scroll_handler.py @@ -1,6 +1,5 @@ import asyncio import logging -import time from playwright.async_api import Page @@ -121,13 +120,16 @@ async def capture_viewport(screenshot_counter=0): if capture_screenshots: processed_filename = f'{page_identifier}_global_viewport_{screenshot_counter}' - screenshot_base64 = await self._action_handler.b64_page_screenshot( + screenshot_base64, screenshot_path = await self._action_handler.b64_page_screenshot( file_name=processed_filename, context='scroll' ) if screenshot_base64: - screenshot_image_list.append(screenshot_base64) + screenshot_image_list.append({ + 'base64': screenshot_base64, + 'path': screenshot_path + }) scroll_count = 0 await capture_viewport(scroll_count) @@ -167,13 +169,16 @@ async def capture_viewport(screenshot_counter=0): if capture_screenshots: processed_filename = f'{page_identifier}_container_viewport_{screenshot_counter}' - screenshot_base64 = await self._action_handler.b64_page_screenshot( + screenshot_base64, screenshot_path = await self._action_handler.b64_page_screenshot( file_name=processed_filename, context='scroll' ) if screenshot_base64: - screenshot_image_list.append(screenshot_base64) + screenshot_image_list.append({ + 'base64': screenshot_base64, + 'path': screenshot_path + }) try: container_exists = await self.page.evaluate( @@ -298,12 +303,15 @@ async def scroll_and_crawl( if not scroll: logging.debug('Scrolling disabled, exiting after initial capture.') processed_filename = f'{page_identifier}_initial' - screenshot_base64 = await self._action_handler.b64_page_screenshot( + screenshot_base64, screenshot_path = await self._action_handler.b64_page_screenshot( file_name=processed_filename, context='scroll' ) if screenshot_base64: - screenshot_image_list.append(screenshot_base64) + screenshot_image_list.append({ + 'base64': screenshot_base64, + 'path': screenshot_path + }) return screenshot_image_list try: @@ -354,22 +362,28 @@ async def scroll_and_crawl( else: logging.debug('No scrollable containers found, taking single screenshot') processed_filename = f'{page_identifier}_no_scroll' - screenshot_base64 = await self._action_handler.b64_page_screenshot( + screenshot_base64, screenshot_path = await self._action_handler.b64_page_screenshot( file_name=processed_filename, context='scroll' ) if screenshot_base64: - screenshot_image_list.append(screenshot_base64) + screenshot_image_list.append({ + 'base64': screenshot_base64, + 'path': screenshot_path + }) except Exception as e: logging.error(f'Error in smart scroll: {e}') # if error, at least take one screenshot processed_filename = f'{page_identifier}_error_fallback' - screenshot_base64 = await self._action_handler.b64_page_screenshot( + screenshot_base64, screenshot_path = await self._action_handler.b64_page_screenshot( file_name=processed_filename, context='error' ) if screenshot_base64: - screenshot_image_list.append(screenshot_base64) + screenshot_image_list.append({ + 'base64': screenshot_base64, + 'path': screenshot_path + }) return screenshot_image_list diff --git a/webqa_agent/browser/check.py b/webqa_agent/browser/check.py index 7de2365..a677e45 100644 --- a/webqa_agent/browser/check.py +++ b/webqa_agent/browser/check.py @@ -195,8 +195,8 @@ async def response_callback(response): else: try: if any( - bin_type in content_type.lower() - for bin_type in [ + asset_type in content_type.lower() + for asset_type in [ 'image/', 'audio/', 'video/', @@ -204,15 +204,23 @@ async def response_callback(response): 'application/octet-stream', 'font/', 'application/x-font', + 'application/javascript', + 'application/x-javascript', + 'text/javascript', + 'text/css', ] ): - response_data['body'] = f'<{content_type} binary data>' + response_data['body'] = f'<{content_type} asset omitted>' response_data['size'] = len(await response.body()) elif 'application/json' in content_type: try: - body = await response.json() - response_data['body'] = body + body_bytes = await response.body() + if len(body_bytes) > 100000: + response_data['body'] = f'' + response_data['size'] = len(body_bytes) + else: + response_data['body'] = json.loads(body_bytes) except Exception as e: response_data['error'] = f'JSON parse error: {str(e)}' @@ -220,14 +228,17 @@ async def response_callback(response): text_type in content_type.lower() for text_type in [ 'text/', - 'application/javascript', 'application/xml', 'application/x-www-form-urlencoded', ] ): try: text_body = await response.text() - response_data['body'] = text_body + if len(text_body) > 50000: + response_data['body'] = text_body[:50000] + '\n... [truncated]' + response_data['size'] = len(text_body) + else: + response_data['body'] = text_body except Exception as e: response_data['error'] = f'Text decode error: {str(e)}' diff --git a/webqa_agent/cli.py b/webqa_agent/cli.py index de00bf3..688f751 100644 --- a/webqa_agent/cli.py +++ b/webqa_agent/cli.py @@ -294,15 +294,6 @@ async def run_tests(cfg, execution_mode, config_path: str = None, workers: int = is_docker = os.getenv('DOCKER_ENV') == 'true' print(f"🏃 Runtime: {'Docker container' if is_docker else 'Local environment'}") - # Configure screenshot saving - from webqa_agent.actions.action_handler import ActionHandler - save_screenshots = cfg.get('browser_config', {}).get('save_screenshots', False) - ActionHandler.set_screenshot_config(save_screenshots=save_screenshots) - if not save_screenshots: - print('📸 Screenshot saving: disabled (screenshots will be captured but not saved to disk)') - else: - print('📸 Screenshot saving: enabled') - # Execute based on mode if execution_mode == 'run': await execute_run_mode(config_path, workers=workers) diff --git a/webqa_agent/data/test_structures.py b/webqa_agent/data/test_structures.py index 3edadee..83a1321 100644 --- a/webqa_agent/data/test_structures.py +++ b/webqa_agent/data/test_structures.py @@ -155,7 +155,8 @@ def duration(self) -> Optional[float]: class SubTestScreenshot(BaseModel): type: str - data: str # base64 encoded image data + data: str # base64 encoded image data or relative path + label: Optional[str] = None class SubTestAction(BaseModel): @@ -185,7 +186,7 @@ class SubTestResult(BaseModel): TODO: Update type of `messages` """ - sub_test_id: Optional[str] = "" # 对应 case 的 case_id + sub_test_id: Optional[str] = '' # 对应 case 的 case_id name: str status: Optional[TestStatus] = TestStatus.PENDING metrics: Optional[Dict[str, Any]] = {} diff --git a/webqa_agent/executor/case_executor.py b/webqa_agent/executor/case_executor.py index b579603..208800d 100644 --- a/webqa_agent/executor/case_executor.py +++ b/webqa_agent/executor/case_executor.py @@ -15,6 +15,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple +from webqa_agent.actions.action_handler import screenshot_prefix_var from webqa_agent.browser import BrowserSession, BrowserSessionPool from webqa_agent.data import (CaseStep, StepContext, SubTestResult, SubTestStep, TestConfiguration, TestStatus) @@ -99,9 +100,12 @@ async def worker(worker_id: int): # Set test_id context for logging (imitating graph.py style) # Including both ID and Name for maximum clarity - log_context = f'Run Case Test | {case_id} | {case_name}' + log_context = f'{case_id} | {case_name}' token = test_id_var.set(log_context) + # Set screenshot prefix to avoid filename collisions in parallel execution + prefix_token = screenshot_prefix_var.set(case_id) + session = None case_result = None @@ -110,7 +114,7 @@ async def worker(worker_id: int): session = await session_pool.acquire(browser_config=browser_cfg, timeout=120.0) with Display.display(case_name): # pylint: disable=not-callable - case_result = await self.execute_single_case(session=session, case=case, case_index=idx) + case_result, raw_monitoring_data = await self.execute_single_case(session=session, case=case, case_index=idx) async with results_lock: results.append(case_result) @@ -134,14 +138,17 @@ async def worker(worker_id: int): final_summary=f'Exception: {str(e)}', report=[], )) + case_result = None + raw_monitoring_data = None finally: - # Reset test_id context + # Reset context variables test_id_var.reset(token) + screenshot_prefix_var.reset(prefix_token) if case_result is not None: case_config = case.get('_config', {}) - self._save_case_result(case_result, case_name, idx, case_config=case_config) + self._save_case_result(case_result, case_name, idx, raw_monitoring_data=raw_monitoring_data, case_config=case_config) self._clear_case_screenshots(case_result) if session: @@ -205,8 +212,10 @@ async def execute_single_case(self, session: BrowserSession, case: Dict[str, Any # Build final result end_time = datetime.now() - return self._build_case_result( + case_id = case.get('case_id', f'case_{case_index}') + case_result, raw_monitoring_data = self._build_case_result( case_name=case_name, + case_id=case_id, case_status=case_status, executed_steps=executed_steps, error_messages=error_messages, @@ -215,6 +224,7 @@ async def execute_single_case(self, session: BrowserSession, case: Dict[str, Any end_time=end_time, ignore_rules=ignore_rules ) + return case_result, raw_monitoring_data # ======================================================================== # Private Methods - Tester Lifecycle @@ -369,10 +379,15 @@ async def _execute_action_step( full_page=True ) + if execution_steps_dict.get('screenshots_paths'): + final_screenshots = execution_steps_dict.get('screenshots_paths') + else: + final_screenshots = execution_steps_dict.get('screenshots') + step_result = SubTestStep( id=step_idx, description=f'action: {action.description}', - screenshots=execution_steps_dict.get('screenshots', []), + screenshots=final_screenshots, modelIO=str(execution_steps_dict.get('modelIO', {})), actions=execution_steps_dict.get('actions', []), status=execution_steps_dict.get('status', TestStatus.PASSED), @@ -431,10 +446,15 @@ async def _execute_verify_step( full_page=False ) + if verification_step.get('screenshots_paths'): + final_screenshots = verification_step.get('screenshots_paths') + else: + final_screenshots = verification_step.get('screenshots') + step_result = SubTestStep( id=step_idx, description=f'verify: {verify.assertion}', - screenshots=verification_step.get('screenshots', []), + screenshots=final_screenshots, modelIO=str(verification_step.get('modelIO', {})), actions=verification_step.get('actions', []), status=verification_step.get('status', TestStatus.PASSED), @@ -551,6 +571,7 @@ def _check_monitoring_errors( def _build_case_result( self, case_name: str, + case_id: str, case_status: TestStatus, executed_steps: List[SubTestStep], error_messages: List[str], @@ -558,11 +579,12 @@ def _build_case_result( start_time: datetime, end_time: datetime, ignore_rules: Optional[Dict[str, Any]] = None - ) -> SubTestResult: + ) -> Tuple[SubTestResult, Dict[str, Any]]: """Build final case result with monitoring check. Args: case_name: Name of the case + case_id: ID of the case (e.g., case_1, case_2) case_status: Current case status executed_steps: List of executed steps error_messages: List of error messages @@ -572,7 +594,7 @@ def _build_case_result( ignore_rules: Optional ignore rules for this specific case Returns: - Complete SubTestResult + Tuple of (SubTestResult, raw_monitoring_data) """ # Build case summary total_steps = len(executed_steps) @@ -593,7 +615,8 @@ def _build_case_result( if error_messages: final_summary += f". Errors: {'; '.join(error_messages)}" - return SubTestResult( + result = SubTestResult( + sub_test_id=case_id, name=case_name, status=case_status, metrics={'total_steps': total_steps, 'passed_steps': passed_steps, 'failed_steps': failed_steps}, @@ -605,6 +628,9 @@ def _build_case_result( report=[], ) + # Return result and raw monitoring data separately for explicit data flow + return result, monitoring_data + # ======================================================================== # Private Methods - File Operations # ======================================================================== @@ -614,6 +640,7 @@ def _save_case_result( case_result: SubTestResult, case_name: str, case_index: int, + raw_monitoring_data: Optional[Dict[str, Any]] = None, case_config: Optional[Dict[str, Any]] = None ) -> None: """Save case result to JSON file. @@ -622,6 +649,7 @@ def _save_case_result( case_result: The case result to save case_name: Name of the case (for filename sanitization) case_index: Index of the case (for ordering in report) + raw_monitoring_data: Raw monitoring data to save separately case_config: Optional case-specific config (for multi-YAML support) """ if self.report_dir is None: @@ -644,6 +672,11 @@ def _save_case_result( # Add config information for template compatibility case_dict = case_result.model_dump() case_dict['case_index'] = case_index # Save index for ordering + + # Remove monitoring data from messages to reduce file size + # Monitoring data is saved separately in *_monitor.json file + case_dict['messages'] = {} + case_dict['config'] = { 'target_url': target_url, 'browser_config': case_config.get('browser_config') if case_config else self.browser_config, @@ -661,6 +694,24 @@ def _save_case_result( with open(case_result_path, 'w', encoding='utf-8') as f: json.dump([case_dict], f, indent=2, ensure_ascii=False, default=str) logging.debug(f'Case result saved to: {case_result_path}') + + # Save monitoring data separately to a corresponding JSON file + if raw_monitoring_data is not None: + try: + monitoring_data_path = report_dir_path / f'test_data_{case_index:03d}_{safe_case_name}_monitor.json' + sub_test_id = case_result.sub_test_id or f'case_{case_index}' + monitoring_dict = { + 'sub_test_id': sub_test_id, + 'name': case_name, + 'corresponding_file': f'test_data_{case_index:03d}_{safe_case_name}.json', + 'monitoring_data': raw_monitoring_data, + 'timestamp': datetime.now().isoformat() + } + with open(monitoring_data_path, 'w', encoding='utf-8') as f: + json.dump(monitoring_dict, f, indent=2, ensure_ascii=False, default=str) + logging.debug(f'Monitoring data saved to: {monitoring_data_path}') + except Exception as e: + logging.warning(f'Failed to save monitoring data for case "{case_name}": {e}') except Exception as mk_err: logging.warning(f"Cannot save case result to '{self.report_dir}': {mk_err}") @@ -669,20 +720,27 @@ def _clear_case_screenshots(self, case_result: SubTestResult) -> None: This significantly reduces memory usage when executing many cases, as screenshot data is no longer needed in memory after being saved. + However, if the screenshots are base64 strings and we're not saving + them as files, we MUST keep them in memory for the final report. Args: case_result: Case result to clear screenshots from """ try: - # Clear screenshots from each step + # Clear screenshots from each step ONLY if they are file paths + # Base64 screenshots must be preserved for the final aggregated report for step in case_result.steps: if step.screenshots: - step.screenshots = [] # Clear screenshot data + # If any screenshot in the step is a path, it's safe to clear + # because the path is already stored in the JSON. + # If they are base64, clearing them will make the final report empty. + if any(s.type == 'path' for s in step.screenshots): + step.screenshots = [] # Also clear modelIO if it's very large (can contain duplicate data) if step.modelIO and len(step.modelIO) > 10000: step.modelIO = '[cleared after save]' - logging.debug(f'Cleared screenshot data for case: {case_result.name}') + logging.debug(f'Cleared screenshot paths for case: {case_result.name}') except Exception as e: logging.warning(f'Failed to clear screenshots: {e}') diff --git a/webqa_agent/executor/case_mode.py b/webqa_agent/executor/case_mode.py index e4a668a..9baafbc 100644 --- a/webqa_agent/executor/case_mode.py +++ b/webqa_agent/executor/case_mode.py @@ -12,6 +12,7 @@ from pydantic import ValidationError +from webqa_agent.actions.action_handler import ActionHandler from webqa_agent.browser.config import DEFAULT_CONFIG from webqa_agent.data import (ParallelTestSession, TestConfiguration, TestResult, TestStatus) @@ -133,9 +134,33 @@ async def run( # Set up report directory report_ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f') - report_dir = os.path.join('.', 'reports', f'test_{report_ts}') os.environ['WEBQA_REPORT_TIMESTAMP'] = report_ts + # Initialize screenshot directory for this test session + # Clear any existing session state to ensure isolation + ActionHandler.clear_screenshot_session() + + report_dir = report_config.get('report_dir') if report_config else None + # Handle null, None, empty string, or missing value + if not report_dir or (isinstance(report_dir, str) and report_dir.strip() == ''): + # Use default reports/{timestamp}/ directory + report_dir = os.path.join('reports', f'test_{report_ts}') + + # Update report_config with the resolved report_dir for consistency + if report_config is not None: + report_config['report_dir'] = report_dir + else: + report_config = {'report_dir': report_dir, 'language': 'en-US'} + + test_session.report_path = report_dir + + # Configure screenshot saving behavior + save_screenshots = report_config.get('save_screenshots', False) + ActionHandler.set_screenshot_config(save_screenshots=save_screenshots) + + ActionHandler.init_screenshot_session(custom_report_dir=report_dir) + logging.info(f'📸 Screenshot directory initialized for report: {report_dir}') + test_config = TestConfiguration( test_id=str(uuid.uuid4()), # test_type=TestType.UI_AGENT_LANGGRAPH, @@ -307,6 +332,47 @@ def _generate_html_with_jinja2( test_data.extend(file_data) except Exception as e: logging.warning(f'Failed to read test data file: {data_file}, error: {str(e)}') + + # Load monitoring data files and merge with corresponding case data + monitoring_files = sorted(glob.glob(os.path.join(report_dir, '*_monitor.json'))) + monitoring_data_by_sub_test_id = {} + for monitoring_file in monitoring_files: + try: + with open(monitoring_file, 'r', encoding='utf-8') as f: + monitoring_data = json.load(f) + monitoring_content = monitoring_data.get('monitoring_data', {}) + sub_test_id = monitoring_data.get('sub_test_id') + if sub_test_id: + monitoring_data_by_sub_test_id[sub_test_id] = monitoring_content + except Exception as e: + logging.warning(f'Failed to read monitoring data file: {monitoring_file}, error: {str(e)}') + + # Merge monitoring data into corresponding case data + for case in test_data: + case_sub_test_id = case.get('sub_test_id') + raw_monitoring = None + + # Match by sub_test_id + if case_sub_test_id and case_sub_test_id in monitoring_data_by_sub_test_id: + raw_monitoring = monitoring_data_by_sub_test_id[case_sub_test_id] + + if raw_monitoring: + # Rebuild messages field from monitoring data for template compatibility + # Convert monitoring data to template-expected format + console_errors = raw_monitoring.get('console', []) + network_data = raw_monitoring.get('network', { + 'responses': [], + 'failed_requests': [] + }) + case['messages'] = { + 'console_error_message': console_errors, + 'network_message': network_data + } + logging.debug(f'Merged monitoring data for case {case_sub_test_id}: {case.get("name", "unknown")}') + else: + # If no monitoring data, ensure messages is an empty dict + if 'messages' not in case or not case.get('messages'): + case['messages'] = {} except Exception as e: logging.error(f'Failed to merge test data: {str(e)}') return None diff --git a/webqa_agent/executor/parallel_executor.py b/webqa_agent/executor/parallel_executor.py index b06861a..f50b994 100644 --- a/webqa_agent/executor/parallel_executor.py +++ b/webqa_agent/executor/parallel_executor.py @@ -188,6 +188,10 @@ async def _execute_single_test( # Set test-level context for logging. This will be overridden by case-level context where applicable. token = test_id_var.set(test_config.test_name) + # Set screenshot prefix to avoid collisions. LangGraph workers will override this with case_id. + from webqa_agent.actions.action_handler import screenshot_prefix_var + prefix_token = screenshot_prefix_var.set(test_config.test_id or test_config.test_name) + async with semaphore: test_context = test_session.test_contexts[test_config.test_id] test_context.start_execution() @@ -201,7 +205,7 @@ async def _execute_single_test( if test_config.test_type == TestType.UI_AGENT_LANGGRAPH: # LangGraph tests manage sessions internally via session pool session = None - test_context.session_id = "langgraph_pool_mode" + test_context.session_id = 'langgraph_pool_mode' elif test_config.test_type in [ TestType.UX_TEST, @@ -303,8 +307,9 @@ async def _execute_single_test( # Release browser session back to pool if session is not None: await self.session_pool.release(session, failed=browser_failed) - # Reset test_id context + # Reset context variables test_id_var.reset(token) + screenshot_prefix_var.reset(prefix_token) def _resolve_test_dependencies(self, tests: List[TestConfiguration]) -> List[List[TestConfiguration]]: """Resolve test dependencies and return execution batches. @@ -377,16 +382,16 @@ async def _finalize_session(self, test_session: ParallelTestSession): test_session.aggregated_results = aggregated_results # Generate JSON & HTML reports - report_path = await self.result_aggregator.generate_json_report(test_session) - test_session.report_path = report_path + json_path = await self.result_aggregator.generate_json_report(test_session) + # test_session.report_path = report_path - report_dir = os.path.dirname(report_path) + report_dir = os.path.dirname(json_path) html_path = self.result_aggregator.generate_html_report_fully_inlined( test_session, report_dir=report_dir ) test_session.html_report_path = html_path - logging.debug(f'Report generated: {report_path}') + logging.debug(f'Report generated: {json_path}') logging.debug(f'HTML report generated: {html_path}') # Mark session as completed if not already done diff --git a/webqa_agent/executor/parallel_mode.py b/webqa_agent/executor/parallel_mode.py index 211c130..535e207 100644 --- a/webqa_agent/executor/parallel_mode.py +++ b/webqa_agent/executor/parallel_mode.py @@ -2,8 +2,9 @@ import os import uuid from datetime import datetime -from typing import Any, Coroutine, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple +from webqa_agent.actions.action_handler import ActionHandler from webqa_agent.browser.config import DEFAULT_CONFIG from webqa_agent.data import (ParallelTestSession, TestConfiguration, TestType, get_default_test_name) @@ -43,9 +44,14 @@ async def run( Tuple of (aggregated_results, report_path) """ try: + # Use default config if none provided + if not log_cfg: + log_cfg = {'level': 'info'} + if not report_cfg: + report_cfg = {'language': 'en-US'} - GetLog.get_log(log_level=log_cfg['level']) - Display.init(language=report_cfg['language']) + GetLog.get_log(log_level=log_cfg.get('level', 'info')) + Display.init(language=report_cfg.get('language', 'en-US')) Display.display.start() logging.info(f"{icon['rocket']} Starting tests for URL: {url}, parallel mode {self.max_concurrent_tests}") @@ -61,6 +67,27 @@ async def run( report_ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f') os.environ['WEBQA_REPORT_TIMESTAMP'] = report_ts + # Initialize screenshot directory for this test session + # Clear any existing session state to ensure isolation + ActionHandler.clear_screenshot_session() + + # Use report_cfg to determine report directory, update it if missing + custom_report_dir = report_cfg.get('report_dir') + # Handle null, None, empty string, or missing value + if not custom_report_dir or (isinstance(custom_report_dir, str) and custom_report_dir.strip() == ''): + # Use default reports/test_{timestamp}/ directory + custom_report_dir = os.path.join('reports', f'test_{report_ts}') + report_cfg['report_dir'] = custom_report_dir + + test_session.report_path = custom_report_dir + + # Configure screenshot saving behavior + save_screenshots = report_cfg.get('save_screenshots', False) + ActionHandler.set_screenshot_config(save_screenshots=save_screenshots) + + ActionHandler.init_screenshot_session(custom_report_dir=custom_report_dir) + logging.info(f'📸 Screenshot directory initialized for report: {custom_report_dir}') + # Configure tests based on input or legacy test objects if test_configurations: self._configure_tests_from_config(test_session, test_configurations, browser_config, report_cfg) @@ -70,10 +97,8 @@ async def run( result = completed_session.aggregated_results.get('count', {}) - await Display.display.stop() Display.display.render_summary() - # Return results in format compatible with existing code return ( completed_session.aggregated_results, completed_session.report_path, @@ -105,7 +130,7 @@ def _configure_tests_from_config( test_config = TestConfiguration( test_id=str(uuid.uuid4()), test_type=test_type, - test_name=get_default_test_name(test_type, report_cfg['language']), + test_name=get_default_test_name(test_type, report_cfg.get('language', 'zh-CN')), enabled=config.get('enabled', True), browser_config=browser_config, report_config=report_cfg, diff --git a/webqa_agent/executor/result_aggregator.py b/webqa_agent/executor/result_aggregator.py index 79db916..14fe897 100644 --- a/webqa_agent/executor/result_aggregator.py +++ b/webqa_agent/executor/result_aggregator.py @@ -1,6 +1,7 @@ import json import logging import os +from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional @@ -23,6 +24,7 @@ def __init__(self, report_config: dict = None): 'zh-CN': i18n.get_lang_data('zh-CN').get('aggregator', {}), 'en-US': i18n.get_lang_data('en-US').get('aggregator', {}), } + self.report_dir = report_config.get('report_dir', None) def _get_text(self, key: str) -> str: """Get localized text for the given key.""" @@ -246,8 +248,13 @@ async def generate_json_report(self, test_session: ParallelTestSession, report_d try: # Determine report directory if report_dir is None: - timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') or os.getenv('WEBQA_TIMESTAMP') - report_dir = os.path.join('.', 'reports', f'test_{timestamp}') + # Priority: 1. test_session.report_path 2. self.report_dir 3. fallback env-based + report_dir = test_session.report_path or self.report_dir + + if not report_dir: + timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') or os.getenv('WEBQA_TIMESTAMP') or datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + report_dir = os.path.join('reports', f'test_{timestamp}') + os.makedirs(report_dir, exist_ok=True) json_path = os.path.join(report_dir, 'test_results.json') @@ -353,8 +360,13 @@ def generate_html_report_fully_inlined(self, test_session, report_dir: str | Non ) if report_dir is None: - timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') or os.getenv('WEBQA_TIMESTAMP') - report_dir = os.path.join('.', 'reports', f'test_{timestamp}') + # Priority: 1. test_session.report_path 2. self.report_dir 3. fallback env-based + report_dir = test_session.report_path or self.report_dir + + if not report_dir: + timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') or os.getenv('WEBQA_TIMESTAMP') or datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + report_dir = os.path.join('reports', f'test_{timestamp}') + # Ensure report dir exists; if creation fails, fallback to temp dir try: os.makedirs(report_dir, exist_ok=True) diff --git a/webqa_agent/executor/test_runners.py b/webqa_agent/executor/test_runners.py index 39918f7..364774d 100644 --- a/webqa_agent/executor/test_runners.py +++ b/webqa_agent/executor/test_runners.py @@ -86,6 +86,7 @@ async def run_test( # Infrastructure 'session_pool': session_pool, 'llm_config': llm_config, + 'report_config': test_config.report_config, } graph_config = {'recursion_limit': 100} @@ -131,9 +132,24 @@ async def run_test( for step_data in case_steps_raw: # 转换截图数据 screenshots = [] - for scr in step_data.get('screenshots', []): - if isinstance(scr, dict) and scr.get('type') == 'base64': - screenshots.append(SubTestScreenshot(type='base64', data=scr.get('data', ''))) + # 优先检查路径形式的截图 + if step_data.get('screenshots_paths'): + for scr in step_data.get('screenshots_paths', []): + if isinstance(scr, dict) and 'type' in scr and 'data' in scr: + screenshots.append(SubTestScreenshot( + type=scr['type'], + data=scr['data'], + label=scr.get('label') + )) + else: + # 回退到 base64 形式的截图 + for scr in step_data.get('screenshots', []): + if isinstance(scr, dict) and 'type' in scr and 'data' in scr: + screenshots.append(SubTestScreenshot( + type=scr['type'], + data=scr['data'], + label=scr.get('label') + )) # 转换状态 step_status_str = step_data.get('status', 'passed').lower() diff --git a/webqa_agent/testers/basic_tester.py b/webqa_agent/testers/basic_tester.py index 1f414f6..355c022 100644 --- a/webqa_agent/testers/basic_tester.py +++ b/webqa_agent/testers/basic_tester.py @@ -226,18 +226,38 @@ async def run(self, url: str, page: Page, clickable_elements: dict, **kwargs) -> screenshots = [] click_result = await click_handler.click_and_screenshot(page, element, highlight_id) + + # Handle screenshot after click + if click_result.get('screenshot_after_path'): + screenshots.append(SubTestScreenshot(type='path', data=click_result['screenshot_after_path'], label='After Click')) + if click_result.get('screenshot_after'): scr = click_result['screenshot_after'] - if isinstance(scr, str): - screenshots.append(SubTestScreenshot(type='base64', data=scr)) - elif isinstance(scr, dict): - screenshots.append(SubTestScreenshot(**scr)) + if not click_result.get('screenshot_after_path'): + if isinstance(scr, str) and scr.startswith('data:image'): + screenshots.append(SubTestScreenshot(type='base64', data=scr, label='After Click')) + elif isinstance(scr, dict) and scr.get('data'): + screenshots.append(SubTestScreenshot( + type=scr.get('type', 'base64'), + data=scr['data'], + label=scr.get('label', 'After Click') + )) + + # Handle new page screenshot + if click_result.get('new_page_screenshot_path'): + screenshots.append(SubTestScreenshot(type='path', data=click_result['new_page_screenshot_path'], label='New Page')) + if click_result.get('new_page_screenshot'): scr = click_result['new_page_screenshot'] - if isinstance(scr, str): - screenshots.append(SubTestScreenshot(type='base64', data=scr)) - elif isinstance(scr, dict): - screenshots.append(SubTestScreenshot(**scr)) + if not click_result.get('new_page_screenshot_path'): + if isinstance(scr, str) and scr.startswith('data:image'): + screenshots.append(SubTestScreenshot(type='base64', data=scr, label='New Page')) + elif isinstance(scr, dict) and scr.get('data'): + screenshots.append(SubTestScreenshot( + type=scr.get('type', 'base64'), + data=scr['data'], + label=scr.get('label', 'New Page') + )) business_success = click_result['success'] step = SubTestStep( diff --git a/webqa_agent/testers/case_gen/agents/execute_agent.py b/webqa_agent/testers/case_gen/agents/execute_agent.py index 11fd8b2..1bb8de8 100644 --- a/webqa_agent/testers/case_gen/agents/execute_agent.py +++ b/webqa_agent/testers/case_gen/agents/execute_agent.py @@ -1278,7 +1278,7 @@ async def agent_worker_node(state: dict, config: dict) -> dict: page = ui_tester_instance.browser_session.page dp = DeepCrawler(page) await dp.crawl(highlight=True, viewport_only=True) - screenshot = await ui_tester_instance._actions.b64_page_screenshot( + screenshot, _ = await ui_tester_instance._actions.b64_page_screenshot( file_name=f'step_{i + 1}_vision', context='agent' ) @@ -1454,7 +1454,7 @@ async def agent_worker_node(state: dict, config: dict) -> dict: # Get current page screenshot for LLM analysis try: - recovery_screenshot = await ui_tester_instance._actions.b64_page_screenshot( + recovery_screenshot, _ = await ui_tester_instance._actions.b64_page_screenshot( file_name=f'step_{i + 1}_recovery_attempt_{retry_count + 1}', context='error' ) @@ -1539,7 +1539,7 @@ async def agent_worker_node(state: dict, config: dict) -> dict: # Prepare context for LLM recovery (use correct variable: ui_tester_instance) screenshot_b64 = None try: - screenshot_b64 = await ui_tester_instance._actions.b64_page_screenshot( + screenshot_b64, _ = await ui_tester_instance._actions.b64_page_screenshot( file_name=f'recovery_step_{i + 1}', context='adaptive_recovery' ) @@ -1653,7 +1653,7 @@ async def agent_worker_node(state: dict, config: dict) -> dict: try: # Capture screenshot for visual context after successful step execution logging.debug('Capturing screenshot for dynamic step generation context') - screenshot = await ui_tester_instance._actions.b64_page_screenshot() + screenshot, _ = await ui_tester_instance._actions.b64_page_screenshot() # Enhance objective with generation context for smarter LLM decision-making enhanced_objective = case.get('objective', '') diff --git a/webqa_agent/testers/case_gen/graph.py b/webqa_agent/testers/case_gen/graph.py index b113ce0..a35ede2 100644 --- a/webqa_agent/testers/case_gen/graph.py +++ b/webqa_agent/testers/case_gen/graph.py @@ -73,7 +73,7 @@ async def plan_test_cases(state: MainGraphState) -> Dict[str, List[Dict[str, Any page_type = getattr(crawl_result, 'page_type', 'unknown') logging.warning(f'Initial page type ({page_type}) is unsupported, cannot generate test cases') return {'test_cases': []} - screenshot = await ui_tester._actions.b64_page_screenshot( + screenshot, _ = await ui_tester._actions.b64_page_screenshot( full_page=True, file_name='plan_full_page', context='agent' @@ -290,7 +290,10 @@ async def plan_test_cases(state: MainGraphState) -> Dict[str, List[Dict[str, Any try: timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') - report_dir = f'./reports/test_{timestamp}' + report_dir = state.get('report_config').get('report_dir') + if not report_dir: + timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') + report_dir = os.path.join('reports', f'test_{timestamp}') os.makedirs(report_dir, exist_ok=True) cases_path = os.path.join(report_dir, 'cases.json') with open(cases_path, 'w', encoding='utf-8') as f: @@ -369,6 +372,12 @@ async def worker(worker_id: int): # 设置日志上下文(case_id + case_name 组合,方便 grep 和识别) log_context = f'AI Function Test | {case_id}' token = test_id_var.set(log_context) + + # Set screenshot prefix to avoid filename collisions in parallel execution + from webqa_agent.actions.action_handler import \ + screenshot_prefix_var + prefix_token = screenshot_prefix_var.set(case_id) + try: logging.info(f"Worker {worker_id}: Starting case '{case_name}'" + (' [REPLANNED]' if is_replanned else '')) @@ -481,7 +490,11 @@ async def worker(worker_id: int): # 保存更新后的 cases.json try: timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') - report_dir = f'./reports/test_{timestamp}' + # report_dir = os.path.join('reports', f'test_{timestamp}') + report_dir = state.get('report_config').get('report_dir') + if not report_dir: + timestamp = os.getenv('WEBQA_REPORT_TIMESTAMP') + report_dir = os.path.join('reports', f'test_{timestamp}') os.makedirs(report_dir, exist_ok=True) cases_path = os.path.join(report_dir, 'cases.json') with open(cases_path, 'w', encoding='utf-8') as f: @@ -524,6 +537,8 @@ async def worker(worker_id: int): finally: # 重置日志上下文 test_id_var.reset(token) + screenshot_prefix_var.reset(prefix_token) + # Release or close session based on remaining work if s: # Check if there are more cases waiting in the queue @@ -580,7 +595,7 @@ async def _do_reflection(ui_tester: UITester, state: dict, case_name: str) -> di str(ElementKey.ATTRIBUTES), str(ElementKey.CENTER_X), str(ElementKey.CENTER_Y) ] page_content_summary = curr.clean_dict(reflect_template) - screenshot = await ui_tester._actions.b64_page_screenshot( + screenshot, _ = await ui_tester._actions.b64_page_screenshot( full_page=True, file_name=f'reflection_{case_name}', context='agent' ) await dp.remove_marker() diff --git a/webqa_agent/testers/case_gen/state/schemas.py b/webqa_agent/testers/case_gen/state/schemas.py index 2f8fb2e..ebbe614 100644 --- a/webqa_agent/testers/case_gen/state/schemas.py +++ b/webqa_agent/testers/case_gen/state/schemas.py @@ -26,6 +26,7 @@ class MainGraphState(TypedDict): # Infrastructure session_pool: Any # BrowserSessionPool instance llm_config: Optional[dict] # LLM config for creating UITester + report_config: Optional[dict] # Report config # Output final_report: Optional[dict] diff --git a/webqa_agent/testers/case_gen/tools/element_action_tool.py b/webqa_agent/testers/case_gen/tools/element_action_tool.py index 56d4e22..e8b0128 100644 --- a/webqa_agent/testers/case_gen/tools/element_action_tool.py +++ b/webqa_agent/testers/case_gen/tools/element_action_tool.py @@ -110,7 +110,7 @@ async def get_full_page_context( screenshot = None if include_screenshot: logging.debug('Capturing post-action screenshot') - screenshot = await self.ui_tester_instance._actions.b64_page_screenshot( + screenshot, _ = await self.ui_tester_instance._actions.b64_page_screenshot( full_page=not viewport_only, file_name='ui_error_check', context='error' @@ -237,6 +237,7 @@ async def _arun( # execution_steps is a dict with structure: {"actions": [...], "screenshots": [...], "status": "...", ...} # Extract screenshots and actions from the dict screenshots = execution_steps.get('screenshots', []) + screenshots_paths = execution_steps.get('screenshots_paths', []) actions = execution_steps.get('actions', []) step_status = execution_steps.get('status', 'passed') model_io = execution_steps.get('modelIO', '') @@ -245,6 +246,7 @@ async def _arun( recorder.add_step( description=instruction, screenshots=screenshots, + screenshots_paths=screenshots_paths, model_io=model_io, actions=actions, status=step_status, @@ -527,6 +529,7 @@ async def _arun(self, assertion: str, focus_region: Optional[str] = None) -> str # execution_steps is a dict with structure: {"actions": [...], "screenshots": [...], "status": "...", ...} # Extract screenshots and actions from the dict screenshots = execution_steps.get('screenshots', []) + screenshots_paths = execution_steps.get('screenshots_paths', []) actions = execution_steps.get('actions', []) step_status = execution_steps.get('status', 'passed') model_io = execution_steps.get('modelIO', '') @@ -535,6 +538,7 @@ async def _arun(self, assertion: str, focus_region: Optional[str] = None) -> str recorder.add_step( description=f'Verify: {assertion}', screenshots=screenshots, + screenshots_paths=screenshots_paths, model_io=model_io, actions=actions, status=step_status, diff --git a/webqa_agent/testers/case_gen/tools/ux_tool.py b/webqa_agent/testers/case_gen/tools/ux_tool.py index 7ce416b..23ae3bc 100644 --- a/webqa_agent/testers/case_gen/tools/ux_tool.py +++ b/webqa_agent/testers/case_gen/tools/ux_tool.py @@ -114,7 +114,6 @@ async def _arun(self, assertion: str) -> str: screenshot = None img_bytes = await page.screenshot(full_page=True) screenshot = f"data:image/png;base64,{base64.b64encode(img_bytes).decode('utf-8')}" - try: parsed_structure = json.loads(viewport_structure) except Exception: diff --git a/webqa_agent/testers/case_gen/utils/case_recorder.py b/webqa_agent/testers/case_gen/utils/case_recorder.py index c7f5e0f..c755aed 100644 --- a/webqa_agent/testers/case_gen/utils/case_recorder.py +++ b/webqa_agent/testers/case_gen/utils/case_recorder.py @@ -36,7 +36,7 @@ def start_case(self, case_name: str, case_data: dict | None = None): self.current_case_steps = [] self.step_counter = 0 - def add_step(self, *, description: str, screenshots: list | None = None, model_io: str | dict | None = None, + def add_step(self, *, description: str, screenshots: list | None = None, screenshots_paths: list | None = None, model_io: str | dict | None = None, actions: list | None = None, status: str = 'passed', step_type: str = 'action', end_time: str | None = None): """Add a step to the current case recording. @@ -44,6 +44,7 @@ def add_step(self, *, description: str, screenshots: list | None = None, model_i Args: description: Step description screenshots: List of SubTestScreenshot objects or dicts with {"type": "base64", "data": "..."} + screenshots_paths: List of dicts with {"type": "path", "data": "..."} model_io: Model input/output, can be string or dict (will be converted to JSON string) actions: List of actions status: Step status ("passed", "failed", "warning") @@ -62,14 +63,27 @@ def add_step(self, *, description: str, screenshots: list | None = None, model_i # Normalize screenshots to dict format for storage normalized_screenshots = [] - for scr in screenshots: - if isinstance(scr, SubTestScreenshot): - normalized_screenshots.append({'type': scr.type, 'data': scr.data}) - elif isinstance(scr, dict) and 'type' in scr and 'data' in scr: - normalized_screenshots.append(scr) - else: - # Skip invalid screenshot formats - continue + normalized_screenshots_paths = [] + + # Process paths if provided + if screenshots_paths: + for scr in screenshots_paths: + if isinstance(scr, dict) and 'type' in scr and isinstance(scr.get('data'), str): + normalized_screenshots_paths.append(scr) + else: + # Skip invalid screenshot formats + continue + + # Process base64 screenshots if provided + if screenshots: + for scr in screenshots: + if isinstance(scr, SubTestScreenshot): + normalized_screenshots.append({'type': scr.type, 'data': scr.data, 'label': scr.label}) + elif isinstance(scr, dict) and 'type' in scr and 'data' in scr: + normalized_screenshots.append(scr) + else: + # Skip invalid screenshot formats + continue # Ensure modelIO is a string (align with runner format) if isinstance(model_io, str): @@ -86,6 +100,7 @@ def add_step(self, *, description: str, screenshots: list | None = None, model_i 'type': step_type, 'description': description or '', 'screenshots': normalized_screenshots, + 'screenshots_paths': normalized_screenshots_paths, 'modelIO': model_io_str, 'actions': actions, 'status': status, @@ -122,9 +137,34 @@ def to_subtest_result(self, name: str, language: str = 'zh-CN') -> SubTestResult for s in self.current_case_steps: # Convert screenshots screenshots_models: List[SubTestScreenshot] = [] - for scr in s.get('screenshots', []) or []: - if isinstance(scr, dict) and scr.get('type') == 'base64' and isinstance(scr.get('data'), str): - screenshots_models.append(SubTestScreenshot(type='base64', data=scr['data'])) + + paths = s.get('screenshots_paths', []) or [] + base64s = s.get('screenshots', []) or [] + + # Use paths if available, otherwise base64 for each index + max_len = max(len(paths), len(base64s)) + for i in range(max_len): + added = False + # Try to use path first + if i < len(paths): + scr = paths[i] + if isinstance(scr, dict) and 'type' in scr and isinstance(scr.get('data'), str) and scr.get('data'): + screenshots_models.append(SubTestScreenshot( + type=scr['type'], + data=scr['data'], + label=scr.get('label') + )) + added = True + + # If no valid path, try base64 + if not added and i < len(base64s): + scr = base64s[i] + if isinstance(scr, dict) and isinstance(scr.get('data'), str) and scr.get('data'): + screenshots_models.append(SubTestScreenshot( + type=scr.get('type', 'base64'), + data=scr['data'], + label=scr.get('label') + )) # Map status status_str = (s.get('status') or '').lower() @@ -159,8 +199,8 @@ def to_subtest_result(self, name: str, language: str = 'zh-CN') -> SubTestResult reports.append(SubTestReport(title='Summary', issues=self.current_case_data.get('final_summary', ''))) # Extract case_id from case_info if available - case_info = self.current_case_data.get("case_info", {}) if self.current_case_data else {} - case_id = case_info.get("case_id", "") if isinstance(case_info, dict) else "" + case_info = self.current_case_data.get('case_info', {}) if self.current_case_data else {} + case_id = case_info.get('case_id', '') if isinstance(case_info, dict) else '' return SubTestResult( sub_test_id=case_id, diff --git a/webqa_agent/testers/function_tester.py b/webqa_agent/testers/function_tester.py index 05a234d..03dd426 100644 --- a/webqa_agent/testers/function_tester.py +++ b/webqa_agent/testers/function_tester.py @@ -99,6 +99,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo all_execution_steps = [] all_plans = [] # Collect all planning iterations for modelIO all_ordered_screenshots = [] # Collect all screenshots in chronological order + all_ordered_screenshots_paths = [] # Collect all screenshots paths in chronological order final_execution_result = {'success': False, 'message': 'No execution performed'} last_check_thought = None global_before_screenshot = None # Will be assigned in the first iteration @@ -116,7 +117,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo # await dp_pre.remove_marker() # Take global before screenshot (not included in action step screenshots) - global_before_screenshot = await self._actions.b64_page_screenshot( + global_before_screenshot, global_before_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='global_before_screenshot', context='verify' @@ -161,6 +162,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo 'description': f'action: {test_step}', 'actions': all_execution_steps, 'screenshots': [], + 'screenshots_paths': [], 'modelIO': '', 'status': 'failed', 'error': error_msg, @@ -184,12 +186,13 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo await self._actions.update_element_buffer(prev.raw_dict()) # Take screenshot - marker_screenshot = await self._actions.b64_page_screenshot( + marker_screenshot, marker_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name=f'action_planning_marker_iter_{iteration}', context='test' ) all_ordered_screenshots.append(marker_screenshot) + all_ordered_screenshots_paths.append(marker_screenshot_path) # Remove marker await dp.remove_marker() @@ -251,6 +254,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo for step in execution_steps: if step.get('screenshot'): all_ordered_screenshots.append(step.get('screenshot')) + all_ordered_screenshots_paths.append(step.get('screenshot_path')) # Check if we should continue iterating if execution_result.get('check_result') == 'continue': @@ -266,6 +270,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo # Ensure the before_screenshot is the global one from the very beginning if global_before_screenshot: execution_result['before_screenshot'] = global_before_screenshot + execution_result['before_screenshot_path'] = global_before_screenshot_path end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -280,6 +285,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo # Aggregate screenshots: include only valid (non-None) images in the correct chronological order screenshots_list = [{'type': 'base64', 'data': ss} for ss in all_ordered_screenshots if ss] + screenshots_paths_list = [{'type': 'path', 'data': path} for path in all_ordered_screenshots_paths if path] # Build structure for case step format status_str = 'passed' if execution_result.get('success') else 'failed' @@ -288,6 +294,7 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo 'description': f'action: {test_step}', 'actions': execution_steps, # All actions aggregated together 'screenshots': screenshots_list, # All screenshots aggregated together + 'screenshots_paths': screenshots_paths_list, # All screenshots paths aggregated together 'modelIO': json.dumps(all_plans, indent=2, ensure_ascii=False) if all_plans else '', 'status': status_str, 'start_time': start_time, @@ -323,15 +330,18 @@ async def action(self, test_step: str, file_path: str = None, viewport_only: boo # Safely get possibly undefined variables safe_all_ordered_screenshots = locals().get('all_ordered_screenshots', []) + safe_all_ordered_screenshots_paths = locals().get('all_ordered_screenshots_paths', []) safe_plan_json = locals().get('plan_json', {}) # Build error case execution step dictionary structure error_screenshots = [{'type': 'base64', 'data': ss} for ss in safe_all_ordered_screenshots if ss] + error_screenshots_paths = [{'type': 'path', 'data': path} for path in safe_all_ordered_screenshots_paths if path] error_execution_steps = { 'description': f'action: {test_step}', 'actions': locals().get('all_execution_steps', []), 'screenshots': error_screenshots, + 'screenshots_paths': error_screenshots_paths, 'modelIO': '', # No valid model interaction output 'status': 'failed', 'error': str(e), @@ -500,6 +510,7 @@ async def verify( 'description': f'verify: {assertion}', 'actions': [], 'screenshots': [], + 'screenshots_paths': [], 'modelIO': json.dumps(skip_result, ensure_ascii=False), 'status': 'failed', 'start_time': start_time, @@ -515,11 +526,15 @@ async def verify( # Extract before/after screenshots from execution_context before_screenshot = None after_screenshot = None + before_screenshot_path = None + after_screenshot_path = None if execution_context and execution_context.get('last_action'): result = execution_context['last_action'].get('result', {}) before_screenshot = result.get('before_screenshot') after_screenshot = result.get('after_screenshot') + before_screenshot_path = result.get('before_screenshot_path') + after_screenshot_path = result.get('after_screenshot_path') # Validate screenshots if present if before_screenshot and not isinstance(before_screenshot, str): @@ -609,10 +624,17 @@ async def verify( user_prompt = user_prompt + region_guidance # Store screenshots for step data - verification_screenshots = [ - {'type': 'base64', 'data': before_screenshot, 'label': 'Before Action'}, - {'type': 'base64', 'data': after_screenshot, 'label': 'After Action'} - ] + verification_screenshots = [] + if before_screenshot: + verification_screenshots.append({'type': 'base64', 'data': before_screenshot, 'label': 'Before Action'}) + if after_screenshot: + verification_screenshots.append({'type': 'base64', 'data': after_screenshot, 'label': 'After Action'}) + + verification_screenshots_paths = [] + if before_screenshot_path: + verification_screenshots_paths.append({'type': 'path', 'data': before_screenshot_path, 'label': 'Before Action'}) + if after_screenshot_path: + verification_screenshots_paths.append({'type': 'path', 'data': after_screenshot_path, 'label': 'After Action'}) else: # ==================================================================== @@ -629,7 +651,7 @@ async def verify( await dp.crawl(highlight=False, filter_text=True, viewport_only=viewport_only) # Capture new screenshot - screenshot = await self._actions.b64_page_screenshot( + screenshot, screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='verification_clean', context='test' @@ -666,7 +688,7 @@ async def verify( # Store screenshot for step data verification_screenshots = [{'type': 'base64', 'data': screenshot}] if screenshot else [] - + verification_screenshots_paths = [{'type': 'path', 'data': screenshot_path}] if screenshot_path else [] # ======================================================================== # LLM CALL (unified for both modes) # ======================================================================== @@ -715,6 +737,7 @@ async def verify( 'description': f'verify: {assertion}', 'actions': verify_action_list, 'screenshots': verification_screenshots, # Use mode-specific screenshots + 'screenshots_paths': verification_screenshots_paths, # Use mode-specific screenshots paths 'modelIO': result if isinstance(result, str) else json.dumps(result, ensure_ascii=False), 'status': status_str, 'start_time': start_time, @@ -732,13 +755,14 @@ async def verify( # Try to get basic page information even if it fails try: - basic_screenshot = await self._actions.b64_page_screenshot( + basic_screenshot, basic_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='assertion_failed', context='error' ) except: basic_screenshot = None + basic_screenshot_path = None end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -746,6 +770,7 @@ async def verify( 'description': f'verify: {assertion}', 'actions': [], 'screenshots': [{'type': 'base64', 'data': basic_screenshot}] if basic_screenshot else [], + 'screenshots_paths': [{'type': 'path', 'data': basic_screenshot_path}] if basic_screenshot_path else [], 'modelIO': '', 'status': 'failed', 'error': str(e), @@ -892,7 +917,7 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, action_count = len(plan_json.get('actions', [])) # Capture initial screenshot BEFORE any actions (plan-level before state) - initial_screenshot = await self._actions.b64_page_screenshot( + initial_screenshot, initial_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='plan_initial_screenshot', context='verify' @@ -934,8 +959,9 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, # Optimization: If Check action already returned a screenshot with markers, use it if action.get('type') == 'Check' and execution_result.get('screenshot'): post_action_ss = execution_result.get('screenshot') + post_action_path = execution_result.get('screenshot_path') else: - post_action_ss = await self._actions.b64_page_screenshot( + post_action_ss, post_action_path = await self._actions.b64_page_screenshot( file_name=f'action_{action_desc}_{index}', context='test' ) @@ -945,6 +971,7 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, 'success': success, 'message': message, 'screenshot': post_action_ss, + 'screenshot_path': post_action_path, 'index': index, } if check_result: @@ -958,7 +985,7 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, if not success: logging.error(f'Action {index} failed: {message}') # Capture final screenshot even on failure - final_screenshot = await self._actions.b64_page_screenshot( + final_screenshot, final_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='plan_final_screenshot_failed', context='verify' @@ -970,7 +997,9 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, after_action_url, after_action_title = '', '' # Add plan-level screenshots and context to failure result action_result['before_screenshot'] = initial_screenshot + action_result['before_screenshot_path'] = initial_screenshot_path action_result['after_screenshot'] = final_screenshot + action_result['after_screenshot_path'] = final_screenshot_path action_result['after_action_url'] = after_action_url action_result['after_action_title'] = after_action_title action_result['after_action_page_structure'] = '' # 失败场景可为空 @@ -988,13 +1017,14 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, logging.error(error_msg) # Capture final screenshot even on exception try: - final_screenshot = await self._actions.b64_page_screenshot( + final_screenshot, final_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='plan_final_screenshot_exception', context='verify' ) except: final_screenshot = None + final_screenshot_path = None # Capture page context at exception time (for time-consistent verification) try: @@ -1008,6 +1038,7 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, 'screenshot': None, 'before_screenshot': initial_screenshot, 'after_screenshot': final_screenshot, + 'after_screenshot_path': final_screenshot_path, 'after_action_url': after_action_url, 'after_action_title': after_action_title, 'after_action_page_structure': '' # 异常场景可为空 @@ -1019,8 +1050,9 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, # Optimization: Reuse the screenshot from the last executed action if possible if execute_results and execute_results[-1].get('screenshot'): final_screenshot = execute_results[-1].get('screenshot') + final_screenshot_path = execute_results[-1].get('screenshot_path') else: - final_screenshot = await self._actions.b64_page_screenshot( + final_screenshot, final_screenshot_path = await self._actions.b64_page_screenshot( full_page=full_page, file_name='plan_final_screenshot', context='verify' @@ -1037,7 +1069,7 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, after_action_url, after_action_title = '', '' after_action_page_structure = '' - post_action_ss = await self._actions.b64_page_screenshot( + post_action_ss, _ = await self._actions.b64_page_screenshot( file_name='final_success', context='test' ) @@ -1048,6 +1080,7 @@ async def _execute_plan(self, plan_json: Dict[str, Any], file_path: str = None, 'screenshot': post_action_ss, 'before_screenshot': initial_screenshot, 'after_screenshot': final_screenshot, + 'after_screenshot_path': final_screenshot_path, 'after_action_url': after_action_url, 'after_action_title': after_action_title, 'after_action_page_structure': after_action_page_structure, @@ -1083,7 +1116,7 @@ async def _execute_plan_check(self, action: Dict[str, Any], viewport_only: bool page_type = getattr(curr, 'page_type', 'html') # Take screenshot with markers - marker_screenshot = await self._actions.b64_page_screenshot( + marker_screenshot, _ = await self._actions.b64_page_screenshot( full_page=full_page, file_name='check_action_marker', context='test' diff --git a/webqa_agent/testers/ux_tester.py b/webqa_agent/testers/ux_tester.py index 26d1e87..8f284a2 100644 --- a/webqa_agent/testers/ux_tester.py +++ b/webqa_agent/testers/ux_tester.py @@ -363,7 +363,15 @@ async def _run_single_test(self, result: SubTestResult, user_case: str, id_map: # if screenshot index (0-based), append corresponding screenshot and create step screenshot_idx = issue.get('screenshotid') if isinstance(screenshot_idx, int) and 0 <= screenshot_idx < len(browser_screenshot): - screenshot_data = browser_screenshot[screenshot_idx] + screenshot_item = browser_screenshot[screenshot_idx] + + # Extract base64 and path from new format or legacy string format + if isinstance(screenshot_item, dict): + screenshot_data = screenshot_item.get('base64') + screenshot_path = screenshot_item.get('path') + else: + screenshot_data = screenshot_item + screenshot_path = None def _annotate_b64_image(image_b64: str, rect: List[int]) -> str: if not (_PIL_AVAILABLE and isinstance(image_b64, str) and image_b64.startswith('data:image')): @@ -387,29 +395,28 @@ def _annotate_b64_image(image_b64: str, rect: List[int]) -> str: except Exception: return image_b64 - annotated_b64 = None screenshots = [] - if isinstance(screenshot_data, str): - # Always include annotated (if possible) and original in order - if coords is not None: - annotated_b64 = _annotate_b64_image(screenshot_data, coords) - screenshots.append(SubTestScreenshot(type='base64', data=annotated_b64)) - screenshots.append(SubTestScreenshot(type='base64', data=screenshot_data)) - else: - # No coordinates -> include original only - screenshots.append(SubTestScreenshot(type='base64', data=screenshot_data)) - elif isinstance(screenshot_data, dict): - data_str = screenshot_data.get('data') - if isinstance(data_str, str): - if coords is not None: - annotated_b64 = _annotate_b64_image(data_str, coords) - screenshots.append(SubTestScreenshot(type='base64', data=annotated_b64)) - screenshots.append(SubTestScreenshot(type='base64', data=data_str)) - else: - screenshots.append(SubTestScreenshot(type='base64', data=data_str)) - else: - # Unable to annotate; include original dict - screenshots.append(SubTestScreenshot(**screenshot_data)) + # 1. Annotated image (always base64 if it exists) + if coords is not None and screenshot_data and isinstance(screenshot_data, str): + annotated_b64 = _annotate_b64_image(screenshot_data, coords) + if annotated_b64: + screenshots.append(SubTestScreenshot(type='base64', data=annotated_b64, label='Annotated')) + + # 2. Original image (path) + if screenshot_path and isinstance(screenshot_path, str): + screenshots.append(SubTestScreenshot(type='path', data=screenshot_path, label='Original')) + + # 3. Original image (base64) - only if no path exists to keep JSON small, + # or if specifically in base64 mode. + if not screenshot_path and screenshot_data: + if isinstance(screenshot_data, str) and screenshot_data.startswith('data:image'): + screenshots.append(SubTestScreenshot(type='base64', data=screenshot_data, label='Original')) + elif isinstance(screenshot_data, dict) and screenshot_data.get('data'): + # Handle case where screenshot_data is already a dict + screenshots.append(SubTestScreenshot( + type=screenshot_data.get('type', 'base64'), + data=screenshot_data['data'] + )) # step status: all discovered issues are warnings step_status = TestStatus.WARNING @@ -729,9 +736,20 @@ def _build_prompt(self, user_case: str, id_map: dict, screenshot_count: int = 0) async def _get_llm_response(self, prompt: str, page_img: bool, browser_screenshot=None): if page_img and browser_screenshot: + # Extract base64 data for LLM if it's a list of dicts from ScrollHandler + llm_images = [] + if isinstance(browser_screenshot, list): + for item in browser_screenshot: + if isinstance(item, dict) and 'base64' in item: + llm_images.append(item['base64']) + else: + llm_images.append(item) + else: + llm_images = browser_screenshot + return await self.llm.get_llm_response( LLMPrompt.page_default_prompt, prompt, - images=browser_screenshot, + images=llm_images, ) return await self.llm.get_llm_response(LLMPrompt.page_default_prompt, prompt)