diff --git a/.gitignore b/.gitignore index b49ad98..beb446c 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,6 @@ Thumbs.db gui/__pycache__ __pycache__ -Testfiles -Testfiles/ + +Testfiles/TestOutputs Testfiles_ diff --git a/AUTOTEST_GUI_PLAN.md b/AUTOTEST_GUI_PLAN.md new file mode 100644 index 0000000..0296dd1 --- /dev/null +++ b/AUTOTEST_GUI_PLAN.md @@ -0,0 +1,112 @@ +# Plan for Autotest GUI Mode Implementation + +**I. Objective:** +Create an `autotest.py` script that can launch the Asset Processor GUI headlessly, load a predefined asset (`.zip`), select a predefined preset, verify the predicted rule structure against an expected JSON, trigger processing to a predefined output directory, check the output, and analyze logs for errors or specific messages. This serves as a sanity check for core GUI-driven workflows. + +**II. `TestFiles` Directory:** +A new directory named `TestFiles` will be created in the project root (`c:/Users/Theis/Assetprocessor/Asset-Frameworker/TestFiles/`). This directory will house: +* Sample asset `.zip` files for testing (e.g., `TestFiles/SampleAsset1.zip`). +* Expected rule structure JSON files (e.g., `TestFiles/SampleAsset1_PresetX_expected_rules.json`). +* A subdirectory for test outputs (e.g., `TestFiles/TestOutputs/`). + +**III. `autotest.py` Script:** + +1. **Location:** `c:/Users/Theis/Assetprocessor/Asset-Frameworker/autotest.py` (or `scripts/autotest.py`). +2. **Command-Line Arguments (with defaults pointing to `TestFiles/`):** + * `--zipfile`: Path to the test asset. Default: `TestFiles/default_test_asset.zip`. + * `--preset`: Name of the preset. Default: `DefaultTestPreset`. + * `--expectedrules`: Path to expected rules JSON. Default: `TestFiles/default_test_asset_rules.json`. + * `--outputdir`: Path for processing output. Default: `TestFiles/TestOutputs/DefaultTestOutput`. + * `--search` (optional): Log search term. Default: `None`. + * `--additional-lines` (optional): Context lines for log search. Default: `0`. +3. **Core Structure:** + * Imports necessary modules from the main application and PySide6. + * Adds project root to `sys.path` for imports. + * `AutoTester` class: + * **`__init__(self, app_instance: App)`:** + * Stores `app_instance` and `main_window`. + * Initializes `QEventLoop`. + * Connects `app_instance.all_tasks_finished` to `self._on_all_tasks_finished`. + * Loads expected rules from the `--expectedrules` file. + * **`run_test(self)`:** Orchestrates the test steps sequentially: + 1. Load ZIP (`main_window.add_input_paths()`). + 2. Select Preset (`main_window.preset_editor_widget.editor_preset_list.setCurrentItem()`). + 3. Await Prediction (using `QTimer` to poll `main_window._pending_predictions`, manage with `QEventLoop`). + 4. Retrieve & Compare Rulelist: + * Get actual rules: `main_window.unified_model.get_all_source_rules()`. + * Convert actual rules to comparable dict (`_convert_rules_to_comparable()`). + * Compare with loaded expected rules (`_compare_rules()`). If mismatch, log and fail. + 5. Start Processing (emit `main_window.start_backend_processing` with rules and output settings). + 6. Await Processing (use `QEventLoop` waiting for `_on_all_tasks_finished`). + 7. Check Output Path (verify existence of output dir, list contents, basic sanity checks like non-emptiness or presence of key asset folders). + 8. Retrieve & Analyze Logs (`main_window.log_console.log_console_output.toPlainText()`, filter by `--search`, check for tracebacks). + 9. Report result and call `cleanup_and_exit()`. + * **`_check_prediction_status(self)`:** Slot for prediction polling timer. + * **`_on_all_tasks_finished(self, processed_count, skipped_count, failed_count)`:** Slot for `App.all_tasks_finished` signal. + * **`_convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> dict`:** Converts `SourceRule` objects to the JSON structure defined below. + * **`_compare_rules(self, actual_rules_data: dict, expected_rules_data: dict) -> bool`:** Implements Option 1 comparison logic: + * Errors if an expected field is missing or its value mismatches. + * Logs (but doesn't error on) fields present in actual but not in expected. + * **`_process_and_display_logs(self, logs_text: str)`:** Handles log filtering/display. + * **`cleanup_and_exit(self, success=True)`:** Quits `QCoreApplication` and `sys.exit()`. + * `main()` function: + * Parses CLI arguments. + * Initializes `QApplication`. + * Instantiates `main.App()` (does *not* show the GUI). + * Instantiates `AutoTester(app_instance)`. + * Uses `QTimer.singleShot(0, tester.run_test)` to start the test. + * Runs `q_app.exec()`. + +**IV. `expected_rules.json` Structure (Revised):** +Located in `TestFiles/`. Example: `TestFiles/SampleAsset1_PresetX_expected_rules.json`. +```json +{ + "source_rules": [ + { + "input_path": "SampleAsset1.zip", + "supplier_identifier": "ExpectedSupplier", + "preset_name": "PresetX", + "assets": [ + { + "asset_name": "AssetNameFromPrediction", + "asset_type": "Prop", + "files": [ + { + "file_path": "relative/path/to/file1.png", + "item_type": "MAP_COL", + "target_asset_name_override": null + } + ] + } + ] + } + ] +} +``` + +**V. Mermaid Diagram of Autotest Flow:** +```mermaid +graph TD + A[Start autotest.py with CLI Args (defaults to TestFiles/)] --> B{Setup Args & Logging}; + B --> C[Init QApplication & main.App (GUI Headless)]; + C --> D[Instantiate AutoTester(app_instance)]; + D --> E[QTimer.singleShot -> AutoTester.run_test()]; + + subgraph AutoTester.run_test() + E --> F[Load Expected Rules from --expectedrules JSON]; + F --> G[Load ZIP (--zipfile) via main_window.add_input_paths()]; + G --> H[Select Preset (--preset) via main_window.preset_editor_widget]; + H --> I[Await Prediction (Poll main_window._pending_predictions via QTimer & QEventLoop)]; + I -- Prediction Done --> J[Get Actual Rules from main_window.unified_model]; + J --> K[Convert Actual Rules to Comparable JSON Structure]; + K --> L{Compare Actual vs Expected Rules (Option 1 Logic)}; + L -- Match --> M[Start Processing (Emit main_window.start_backend_processing with --outputdir)]; + L -- Mismatch --> ZFAIL[Log Mismatch & Call cleanup_and_exit(False)]; + M --> N[Await Processing (QEventLoop for App.all_tasks_finished signal)]; + N -- Processing Done --> O[Check Output Dir (--outputdir): Exists? Not Empty? Key Asset Folders?]; + O --> P[Retrieve & Analyze Logs (Search, Tracebacks)]; + P --> Q[Log Test Success & Call cleanup_and_exit(True)]; + end + + ZFAIL --> ZEND[AutoTester.cleanup_and_exit() -> QCoreApplication.quit() & sys.exit()]; + Q --> ZEND; \ No newline at end of file diff --git a/Documentation/01_User_Guide/11_Usage_Autotest.md b/Documentation/01_User_Guide/11_Usage_Autotest.md new file mode 100644 index 0000000..d4ee010 --- /dev/null +++ b/Documentation/01_User_Guide/11_Usage_Autotest.md @@ -0,0 +1,83 @@ +# User Guide: Usage - Automated GUI Testing (`autotest.py`) + +This document explains how to use the `autotest.py` script for automated sanity checks of the Asset Processor Tool's GUI-driven workflow. + +## Overview + +The `autotest.py` script provides a way to run predefined test scenarios headlessly (without displaying the GUI). It simulates the core user actions: loading an asset, selecting a preset, allowing rules to be predicted, processing the asset, and then checks the results against expectations. This is primarily intended as a developer tool for regression testing and ensuring core functionality remains stable. + +## Running the Autotest Script + +From the project root directory, you can run the script using Python: + +```bash +python autotest.py [OPTIONS] +``` + +### Command-Line Options + +The script accepts several command-line arguments to configure the test run. If not provided, they use predefined default values. + +* `--zipfile PATH_TO_ZIP`: + * Specifies the path to the input asset `.zip` file to be used for the test. + * Default: `TestFiles/BoucleChunky001.zip` +* `--preset PRESET_NAME`: + * Specifies the name of the preset to be selected and used for rule prediction and processing. + * Default: `Dinesen` +* `--expectedrules PATH_TO_JSON`: + * Specifies the path to a JSON file containing the expected rule structure that should be generated after the preset is applied to the input asset. + * Default: `TestFiles/test-BoucleChunky001.json` +* `--outputdir PATH_TO_DIR`: + * Specifies the directory where the processed assets will be written. + * Default: `TestFiles/TestOutputs/DefaultTestOutput` +* `--search "SEARCH_TERM"` (optional): + * A string to search for within the application logs generated during the test run. If found, matching log lines (with context) will be highlighted. + * Default: None +* `--additional-lines NUM_LINES` (optional): + * When using `--search`, this specifies how many lines of context before and after each matching log line should be displayed. + * Default: `0` + +**Example Usage:** + +```bash +# Run with default test files and settings +python autotest.py + +# Run with specific test files and search for a log message +python autotest.py --zipfile TestFiles/MySpecificAsset.zip --preset MyPreset --expectedrules TestFiles/MySpecificAsset_rules.json --outputdir TestFiles/TestOutputs/MySpecificOutput --search "Processing complete for asset" +``` + +## `TestFiles` Directory + +The autotest script relies on a directory named `TestFiles` located in the project root. This directory should contain: + +* **Test Asset `.zip` files:** The actual asset archives used as input for tests (e.g., `default_test_asset.zip`, `MySpecificAsset.zip`). +* **Expected Rules `.json` files:** JSON files defining the expected rule structure for a given asset and preset combination (e.g., `default_test_asset_rules.json`, `MySpecificAsset_rules.json`). The structure of this file is detailed in the main autotest plan (`AUTOTEST_GUI_PLAN.md`). +* **`TestOutputs/` subdirectory:** This is the default parent directory where the autotest script will create specific output folders for each test run (e.g., `TestFiles/TestOutputs/DefaultTestOutput/`). + +## Test Workflow + +When executed, `autotest.py` performs the following steps: + +1. **Initialization:** Parses command-line arguments and initializes the main application components headlessly. +2. **Load Expected Rules:** Loads the `expected_rules.json` file. +3. **Load Asset:** Loads the specified `.zip` file into the application. +4. **Select Preset:** Selects the specified preset. This triggers the internal rule prediction process. +5. **Await Prediction:** Waits for the rule prediction to complete. +6. **Compare Rules:** Retrieves the predicted rules from the application and compares them against the loaded expected rules. If there's a mismatch, the test typically fails at this point. +7. **Start Processing:** If the rules match, it initiates the asset processing pipeline, directing output to the specified output directory. +8. **Await Processing:** Waits for all backend processing tasks to complete. +9. **Check Output:** Verifies the existence of the output directory and lists its contents. Basic checks ensure some output was generated. +10. **Analyze Logs:** Retrieves logs from the application. If a search term was provided, it filters and displays relevant log portions. It also checks for Python tracebacks, which usually indicate a failure. +11. **Report Result:** Prints a summary of the test outcome (success or failure) and exits with an appropriate status code (0 for success, 1 for failure). + +## Interpreting Results + +* **Console Output:** The script will log its progress and the results of each step to the console. +* **Log Analysis:** Pay attention to the log output, especially if a `--search` term was used or if any tracebacks are reported. +* **Exit Code:** + * `0`: Test completed successfully. + * `1`: Test failed at some point (e.g., rule mismatch, processing error, traceback found). +* **Output Directory:** Inspect the contents of the specified output directory to manually verify the processed assets if needed. + +This automated test helps ensure the stability of the core processing logic when driven by GUI-equivalent actions. \ No newline at end of file diff --git a/TestFiles/BoucleChunky001.zip b/TestFiles/BoucleChunky001.zip new file mode 100644 index 0000000..cbd6d0b Binary files /dev/null and b/TestFiles/BoucleChunky001.zip differ diff --git a/TestFiles/Test-BoucleChunky001.json b/TestFiles/Test-BoucleChunky001.json new file mode 100644 index 0000000..770d458 --- /dev/null +++ b/TestFiles/Test-BoucleChunky001.json @@ -0,0 +1,57 @@ +{ + "source_rules": [ + { + "input_path": "BoucleChunky001.zip", + "supplier_identifier": "Dinesen", + "preset_name": null, + "assets": [ + { + "asset_name": "BoucleChunky001", + "asset_type": "Surface", + "files": [ + { + "file_path": "BoucleChunky001_AO_1K_METALNESS.png", + "item_type": "MAP_AO", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_COL_1K_METALNESS.png", + "item_type": "MAP_COL", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_DISP16_1K_METALNESS.png", + "item_type": "MAP_DISP", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_DISP_1K_METALNESS.png", + "item_type": "MAP_DISP", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_Fabric.png", + "item_type": "EXTRA", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_METALNESS_1K_METALNESS.png", + "item_type": "MAP_METAL", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_NRM_1K_METALNESS.png", + "item_type": "MAP_NRM", + "target_asset_name_override": "BoucleChunky001" + }, + { + "file_path": "BoucleChunky001_ROUGHNESS_1K_METALNESS.png", + "item_type": "MAP_ROUGH", + "target_asset_name_override": "BoucleChunky001" + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/autotest.py b/autotest.py new file mode 100644 index 0000000..1782ef2 --- /dev/null +++ b/autotest.py @@ -0,0 +1,863 @@ +import argparse +import sys +import logging +import logging.handlers +import time +import json +import shutil # Import shutil for directory operations +from pathlib import Path +from typing import List, Dict, Any + +from PySide6.QtCore import QCoreApplication, QTimer, Slot, QEventLoop, QObject, Signal +from PySide6.QtWidgets import QApplication, QListWidgetItem + +# Add project root to sys.path +project_root = Path(__file__).resolve().parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +try: + from main import App + from gui.main_window import MainWindow + from rule_structure import SourceRule # Assuming SourceRule is in rule_structure.py +except ImportError as e: + print(f"Error importing project modules: {e}") + print(f"Ensure that the script is run from the project root or that the project root is in PYTHONPATH.") + print(f"Current sys.path: {sys.path}") + sys.exit(1) + +# Global variable for the memory log handler +autotest_memory_handler = None + +# Custom Log Filter for Concise Output +class InfoSummaryFilter(logging.Filter): + # Keywords that identify INFO messages to *allow* for concise output + SUMMARY_KEYWORDS_PRECISE = [ + "Test run completed", + "Test succeeded", + "Test failed", + "Rule comparison successful", + "Rule comparison failed", + "ProcessingEngine finished. Summary:", + "Autotest Context:", + "Parsed CLI arguments:", + "Prediction completed successfully.", + "Processing completed.", + "Signal 'all_tasks_finished' received", + "final status:", # To catch "Asset '...' final status:" + "User settings file not found:", + "MainPanelWidget: Default output directory set to:", + # Search related (as per original filter) + "Searching logs for term", + "Search term ", + "Found ", + "No tracebacks found in the logs.", + "--- End Log Analysis ---", + "Log analysis completed.", + ] + # Patterns for case-insensitive rejection + REJECT_PATTERNS_LOWER = [ + # Original debug prefixes (ensure these are still relevant or merge if needed) + "debug:", "orchestrator_trace:", "configuration_debug:", "app_debug:", "output_org_debug:", + # Iterative / Per-item / Per-file details / Intermediate steps + ": item ", # Catches "Asset '...', Item X/Y" + "item successfully processed and saved", + ", file '", # Catches "Asset '...', File '...'" + ": processing regular map", + ": found source file:", + ": determined source bit depth:", + "successfully processed regular map", + "successfully created mergetaskdefinition", + ": preparing processing items", + ": finished preparing items. found", + ": starting core item processing loop", + ", task '", + ": processing merge task", + "loaded from context:", + "using dimensions from first loaded input", + "successfully merged inputs into image", + "successfully processed merge task", + "mergedtaskprocessorstage result", + "calling savevariantsstage", + "savevariantsstage result", + "adding final details to context", + ": finished core item processing loop", + ": copied variant", + ": copied extra file", + ": successfully organized", + ": output organization complete.", + ": metadata saved to", + "worker thread: starting processing for rule:", + "preparing workspace for input:", + "input is a supported archive", + "calling processingengine.process with rule", + "calculated sha5 for", + "calculated next incrementing value for", + "verify: processingengine.process called", + ": effective supplier set to", + ": metadata initialized.", + ": file rules queued for processing", + "successfully loaded base application settings", + "successfully loaded and merged asset_type_definitions", + "successfully loaded and merged file_type_definitions", + "starting rule-based prediction for:", + "rule-based prediction finished successfully for", + "finished rule-based prediction run for", + "updating model with rule-based results for source:", + "debug task ", + "worker thread: finished processing for rule:", + "task finished signal received for", + # Autotest step markers (not global summaries) + "step 1: loading zip file:", + "step 2: selecting preset:", + "step 4: retrieving and comparing rules...", + "step 5: starting processing...", + "step 7: checking output path:", + "output path check completed.", + ] + + def filter(self, record): + # Allow CRITICAL, ERROR, WARNING unconditionally + if record.levelno >= logging.WARNING: + return True + + if record.levelno == logging.INFO: + msg = record.getMessage() + msg_lower = msg.lower() # For case-insensitive pattern rejection + + # 1. Explicitly REJECT if message contains verbose patterns (case-insensitive) + for pattern in self.REJECT_PATTERNS_LOWER: # Use the new list + if pattern in msg_lower: + return False # Reject + + # 2. Then, if not rejected, ALLOW only if message contains precise summary keywords + for keyword in self.SUMMARY_KEYWORDS_PRECISE: # Use the new list + if keyword in msg: # Original message for case-sensitive summary keywords if needed + return True # Allow + + # 3. Reject all other INFO messages that don't match precise summary keywords + return False + + # Reject levels below INFO (e.g., DEBUG) by default for this handler + return False + +# --- Root Logger Configuration for Concise Console Output --- +def setup_autotest_logging(): + """ + Configures the root logger for concise console output for autotest.py. + This ensures that only essential summary information, warnings, and errors + are displayed on the console by default. + """ + root_logger = logging.getLogger() + + # 1. Remove all existing handlers from the root logger. + # This prevents interference from other logging configurations. + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + handler.close() # Close handler before removing + + # 2. Set the root logger's level to DEBUG to capture everything for the memory handler. + # The console handler will still filter down to INFO/selected. + root_logger.setLevel(logging.DEBUG) # Changed from INFO to DEBUG + + # 3. Create a new StreamHandler for sys.stdout (for concise console output). + console_handler = logging.StreamHandler(sys.stdout) + + # 4. Set this console handler's level to INFO. + # The filter will then decide which INFO messages to display on console. + console_handler.setLevel(logging.INFO) + + # 5. Apply the enhanced InfoSummaryFilter to the console handler. + info_filter = InfoSummaryFilter() + console_handler.addFilter(info_filter) + + # 6. Set a concise formatter for the console handler. + formatter = logging.Formatter('[%(levelname)s] %(message)s') + console_handler.setFormatter(formatter) + + # 7. Add this newly configured console handler to the root_logger. + root_logger.addHandler(console_handler) + + # 8. Setup the MemoryHandler + global autotest_memory_handler # Declare usage of global + autotest_memory_handler = logging.handlers.MemoryHandler( + capacity=20000, # Increased capacity + flushLevel=logging.CRITICAL + 1, # Prevent automatic flushing + target=None # Does not flush to another handler + ) + autotest_memory_handler.setLevel(logging.DEBUG) # Capture all logs from DEBUG up + # Not adding a formatter here, will format in _process_and_display_logs + + # 9. Add the memory handler to the root logger. + root_logger.addHandler(autotest_memory_handler) + +# Call the setup function early in the script's execution. +setup_autotest_logging() + +# Logger for autotest.py's own messages. +# Messages from this logger will propagate to the root logger and be filtered +# by the console_handler configured above. +# Setting its level to DEBUG allows autotest.py to generate DEBUG messages, +# which won't appear on the concise console (due to handler's INFO level) +# but can be captured by other handlers (e.g., the GUI's log console). +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) # Ensure autotest.py can generate DEBUGs for other handlers + +# Note: The GUI's log console (e.g., self.main_window.log_console.log_console_output) +# is assumed to capture all logs (including DEBUG) from various modules. +# The _process_and_display_logs function then uses these comprehensive logs for the --search feature. +# This root logger setup primarily makes autotest.py's direct console output concise, +# ensuring that only filtered, high-level information appears on stdout by default. +# --- End of Root Logger Configuration --- + +# --- Argument Parsing --- +def parse_arguments(): + """Parses command-line arguments for the autotest script.""" + parser = argparse.ArgumentParser(description="Automated test script for Asset Processor GUI.") + parser.add_argument( + "--zipfile", + type=Path, + default=project_root / "TestFiles" / "BoucleChunky001.zip", + help="Path to the test asset ZIP file. Default: TestFiles/BoucleChunky001.zip" + ) + parser.add_argument( + "--preset", + type=str, + default="Dinesen", # This should match a preset name in the application + help="Name of the preset to use. Default: Dinesen" + ) + parser.add_argument( + "--expectedrules", + type=Path, + default=project_root / "TestFiles" / "Test-BoucleChunky001.json", + help="Path to the JSON file with expected rules. Default: TestFiles/Test-BoucleChunky001.json" + ) + parser.add_argument( + "--outputdir", + type=Path, + default=project_root / "TestFiles" / "TestOutputs" / "BoucleChunkyOutput", + help="Path for processing output. Default: TestFiles/TestOutputs/BoucleChunkyOutput" + ) + parser.add_argument( + "--search", + type=str, + default=None, + help="Optional log search term. Default: None" + ) + parser.add_argument( + "--additional-lines", + type=int, + default=0, + help="Context lines for log search. Default: 0" + ) + return parser.parse_args() + +class AutoTester(QObject): + """ + Handles the automated testing process for the Asset Processor GUI. + """ + # Define signals if needed, e.g., for specific test events + # test_step_completed = Signal(str) + + def __init__(self, app_instance: App, cli_args: argparse.Namespace): + super().__init__() + self.app_instance: App = app_instance + self.main_window: MainWindow = app_instance.main_window + self.cli_args: argparse.Namespace = cli_args + self.event_loop = QEventLoop(self) + self.prediction_poll_timer = QTimer(self) + self.expected_rules_data: Dict[str, Any] = {} + self.test_step: str = "INIT" # Possible values: INIT, LOADING_ZIP, SELECTING_PRESET, AWAITING_PREDICTION, PREDICTION_COMPLETE, COMPARING_RULES, STARTING_PROCESSING, AWAITING_PROCESSING, PROCESSING_COMPLETE, CHECKING_OUTPUT, ANALYZING_LOGS, DONE + + if not self.main_window: + logger.error("MainWindow instance not found in App. Cannot proceed.") + self.cleanup_and_exit(success=False) + return + + # Connect signals + if hasattr(self.app_instance, 'all_tasks_finished') and isinstance(self.app_instance.all_tasks_finished, Signal): + self.app_instance.all_tasks_finished.connect(self._on_all_tasks_finished) + else: + logger.warning("App instance does not have 'all_tasks_finished' signal or it's not a Signal. Processing completion might not be detected.") + + self._load_expected_rules() + + def _load_expected_rules(self) -> None: + """Loads the expected rules from the JSON file specified by cli_args.""" + self.test_step = "LOADING_EXPECTED_RULES" + logger.debug(f"Loading expected rules from: {self.cli_args.expectedrules}") + try: + with open(self.cli_args.expectedrules, 'r') as f: + self.expected_rules_data = json.load(f) + logger.debug("Expected rules loaded successfully.") + except FileNotFoundError: + logger.error(f"Expected rules file not found: {self.cli_args.expectedrules}") + self.cleanup_and_exit(success=False) + except json.JSONDecodeError as e: + logger.error(f"Error decoding expected rules JSON: {e}") + self.cleanup_and_exit(success=False) + except Exception as e: + logger.error(f"An unexpected error occurred while loading expected rules: {e}") + self.cleanup_and_exit(success=False) + + def run_test(self) -> None: + """Orchestrates the test steps.""" + logger.info("Starting test run...") + + if not self.expected_rules_data: # Ensure rules were loaded + logger.error("Expected rules not loaded. Aborting test.") + self.cleanup_and_exit(success=False) + return + + # Add a specific summary log for essential context + logger.info(f"Autotest Context: Input='{self.cli_args.zipfile.name}', Preset='{self.cli_args.preset}', Output='{self.cli_args.outputdir}'") + + # Step 1: Load ZIP + self.test_step = "LOADING_ZIP" + logger.info(f"Step 1: Loading ZIP file: {self.cli_args.zipfile}") # KEEP INFO - Passes filter + if not self.cli_args.zipfile.exists(): + logger.error(f"ZIP file not found: {self.cli_args.zipfile}") + self.cleanup_and_exit(success=False) + return + try: + # Assuming add_input_paths can take a list of strings or Path objects + self.main_window.add_input_paths([str(self.cli_args.zipfile)]) + logger.debug("ZIP file loading initiated.") + except Exception as e: + logger.error(f"Error during ZIP file loading: {e}") + self.cleanup_and_exit(success=False) + return + + # Step 2: Select Preset + self.test_step = "SELECTING_PRESET" + logger.info(f"Step 2: Selecting preset: {self.cli_args.preset}") # KEEP INFO - Passes filter + preset_found = False + preset_list_widget = self.main_window.preset_editor_widget.editor_preset_list + for i in range(preset_list_widget.count()): + item = preset_list_widget.item(i) + if item and item.text() == self.cli_args.preset: + preset_list_widget.setCurrentItem(item) + logger.debug(f"Preset '{self.cli_args.preset}' selected.") + preset_found = True + break + if not preset_found: + logger.error(f"Preset '{self.cli_args.preset}' not found in the list.") + available_presets = [preset_list_widget.item(i).text() for i in range(preset_list_widget.count())] + logger.debug(f"Available presets: {available_presets}") + self.cleanup_and_exit(success=False) + return + + # Step 3: Await Prediction Completion + self.test_step = "AWAITING_PREDICTION" + logger.debug("Step 3: Awaiting prediction completion...") + self.prediction_poll_timer.timeout.connect(self._check_prediction_status) + self.prediction_poll_timer.start(500) # Poll every 500ms + + # Use a QTimer to allow event loop to process while waiting for this step + # This ensures that the _check_prediction_status can be called. + # We will exit this event_loop from _check_prediction_status when prediction is done. + logger.debug("Starting event loop for prediction...") + self.event_loop.exec() # This loop is quit by _check_prediction_status + self.prediction_poll_timer.stop() + logger.debug("Event loop for prediction finished.") + + + if self.test_step != "PREDICTION_COMPLETE": + logger.error(f"Prediction did not complete as expected. Current step: {self.test_step}") + # Check if there were any pending predictions that never cleared + if hasattr(self.main_window, '_pending_predictions'): + logger.error(f"Pending predictions at timeout: {self.main_window._pending_predictions}") + self.cleanup_and_exit(success=False) + return + logger.info("Prediction completed successfully.") # KEEP INFO - Passes filter + + # Step 4: Retrieve & Compare Rulelist + self.test_step = "COMPARING_RULES" + logger.info("Step 4: Retrieving and Comparing Rules...") # KEEP INFO - Passes filter + actual_source_rules_list: List[SourceRule] = self.main_window.unified_model.get_all_source_rules() + actual_rules_obj = actual_source_rules_list # Keep the SourceRule list for processing + + comparable_actual_rules = self._convert_rules_to_comparable(actual_source_rules_list) + + if not self._compare_rules(comparable_actual_rules, self.expected_rules_data): + logger.error("Rule comparison failed. See logs for details.") + self.cleanup_and_exit(success=False) + return + logger.info("Rule comparison successful.") # KEEP INFO - Passes filter + + # Step 5: Start Processing + self.test_step = "START_PROCESSING" + logger.info("Step 5: Starting Processing...") # KEEP INFO - Passes filter + processing_settings = { + "output_dir": str(self.cli_args.outputdir), # Ensure it's a string for JSON/config + "overwrite": True, + "workers": 1, + "blender_enabled": False # Basic test, no Blender + } + try: + Path(self.cli_args.outputdir).mkdir(parents=True, exist_ok=True) + logger.debug(f"Ensured output directory exists: {self.cli_args.outputdir}") + except Exception as e: + logger.error(f"Could not create output directory {self.cli_args.outputdir}: {e}") + self.cleanup_and_exit(success=False) + return + + if hasattr(self.main_window, 'start_backend_processing') and isinstance(self.main_window.start_backend_processing, Signal): + logger.debug(f"Emitting start_backend_processing with rules count: {len(actual_rules_obj)} and settings: {processing_settings}") + self.main_window.start_backend_processing.emit(actual_rules_obj, processing_settings) + else: + logger.error("'start_backend_processing' signal not found on MainWindow. Cannot start processing.") + self.cleanup_and_exit(success=False) + return + + # Step 6: Await Processing Completion + self.test_step = "AWAIT_PROCESSING" + logger.debug("Step 6: Awaiting processing completion...") + self.event_loop.exec() # This loop is quit by _on_all_tasks_finished + + if self.test_step != "PROCESSING_COMPLETE": + logger.error(f"Processing did not complete as expected. Current step: {self.test_step}") + self.cleanup_and_exit(success=False) + return + logger.info("Processing completed.") # KEEP INFO - Passes filter + + # Step 7: Check Output Path + self.test_step = "CHECK_OUTPUT" + logger.info(f"Step 7: Checking output path: {self.cli_args.outputdir}") # KEEP INFO - Passes filter + output_path = Path(self.cli_args.outputdir) + if not output_path.exists() or not output_path.is_dir(): + logger.error(f"Output directory {output_path} does not exist or is not a directory.") + self.cleanup_and_exit(success=False) + return + + output_items = list(output_path.iterdir()) + if not output_items: + logger.warning(f"Output directory {output_path} is empty. This might be a test failure depending on the case.") + # For a more specific check, one might iterate through actual_rules_obj + # and verify if subdirectories matching asset_name exist. + # e.g. for asset_rule in source_rule.assets: + # expected_asset_dir = output_path / asset_rule.asset_name + # if not expected_asset_dir.is_dir(): logger.error(...) + else: + logger.debug(f"Found {len(output_items)} item(s) in output directory:") + for item in output_items: + logger.debug(f" - {item.name} ({'dir' if item.is_dir() else 'file'})") + logger.info("Output path check completed.") # KEEP INFO - Passes filter + + # Step 8: Retrieve & Analyze Logs + self.test_step = "CHECK_LOGS" + logger.debug("Step 8: Retrieving and Analyzing Logs...") + all_logs_text = "" + if self.main_window.log_console and self.main_window.log_console.log_console_output: + all_logs_text = self.main_window.log_console.log_console_output.toPlainText() + else: + logger.warning("Log console or output widget not found. Cannot retrieve logs.") + + self._process_and_display_logs(all_logs_text) + logger.info("Log analysis completed.") + + # Final Step + logger.info("Test run completed successfully.") # KEEP INFO - Passes filter + self.cleanup_and_exit(success=True) + + @Slot() + def _check_prediction_status(self) -> None: + """Polls the main window for pending predictions.""" + # logger.debug(f"Checking prediction status. Pending: {self.main_window._pending_predictions if hasattr(self.main_window, '_pending_predictions') else 'N/A'}") + if hasattr(self.main_window, '_pending_predictions'): + if not self.main_window._pending_predictions: # Assuming _pending_predictions is a list/dict that's empty when done + logger.debug("No pending predictions. Prediction assumed complete.") + self.test_step = "PREDICTION_COMPLETE" + if self.event_loop.isRunning(): + self.event_loop.quit() + # else: + # logger.debug(f"Still awaiting predictions: {len(self.main_window._pending_predictions)} remaining.") + else: + logger.warning("'_pending_predictions' attribute not found on MainWindow. Cannot check prediction status automatically.") + # As a fallback, if the attribute is missing, we might assume prediction is instant or needs manual check. + # For now, let's assume it means it's done if the attribute is missing, but this is risky. + # A better approach would be to have a clear signal from MainWindow when predictions are done. + self.test_step = "PREDICTION_COMPLETE" # Risky assumption + if self.event_loop.isRunning(): + self.event_loop.quit() + + + @Slot(int, int, int) + def _on_all_tasks_finished(self, processed_count: int, skipped_count: int, failed_count: int) -> None: + """Slot for App.all_tasks_finished signal.""" + logger.info(f"Signal 'all_tasks_finished' received: Processed={processed_count}, Skipped={skipped_count}, Failed={failed_count}") # KEEP INFO - Passes filter + + if self.test_step == "AWAIT_PROCESSING": + logger.debug("Processing completion signal received.") # Covered by the summary log above + if failed_count > 0: + logger.error(f"Processing finished with {failed_count} failed task(s).") + # Even if tasks failed, the test might pass based on output checks. + # The error is logged for information. + self.test_step = "PROCESSING_COMPLETE" + if self.event_loop.isRunning(): + self.event_loop.quit() + else: + logger.warning(f"Signal 'all_tasks_finished' received at an unexpected test step: '{self.test_step}'. Counts: P={processed_count}, S={skipped_count}, F={failed_count}") + + + def _convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> Dict[str, Any]: + """ + Converts a list of SourceRule objects to a dictionary structure + suitable for comparison with the expected_rules.json. + """ + logger.debug(f"Converting {len(source_rules_list)} SourceRule objects to comparable dictionary...") + comparable_sources_list = [] + for source_rule_obj in source_rules_list: + comparable_asset_list = [] + # source_rule_obj.assets is List[AssetRule] + for asset_rule_obj in source_rule_obj.assets: + comparable_file_list = [] + # asset_rule_obj.files is List[FileRule] + for file_rule_obj in asset_rule_obj.files: + comparable_file_list.append({ + "file_path": file_rule_obj.file_path, + "item_type": file_rule_obj.item_type, + "target_asset_name_override": file_rule_obj.target_asset_name_override + }) + comparable_asset_list.append({ + "asset_name": asset_rule_obj.asset_name, + "asset_type": asset_rule_obj.asset_type, + "files": comparable_file_list + }) + comparable_sources_list.append({ + "input_path": Path(source_rule_obj.input_path).name, # Use only the filename + "supplier_identifier": source_rule_obj.supplier_identifier, + "preset_name": source_rule_obj.preset_name, + "assets": comparable_asset_list + }) + logger.debug("Conversion to comparable dictionary finished.") + return {"source_rules": comparable_sources_list} + + def _compare_rule_item(self, actual_item: Dict[str, Any], expected_item: Dict[str, Any], item_type_name: str, parent_context: str = "") -> bool: + """ + Recursively compares an individual actual rule item dictionary with an expected rule item dictionary. + Logs differences and returns True if they match, False otherwise. + """ + item_match = True + + identifier = "" + if item_type_name == "SourceRule": + identifier = expected_item.get('input_path', f'UnknownSource_at_{parent_context}') + elif item_type_name == "AssetRule": + identifier = expected_item.get('asset_name', f'UnknownAsset_at_{parent_context}') + elif item_type_name == "FileRule": + identifier = expected_item.get('file_path', f'UnknownFile_at_{parent_context}') + + current_context = f"{parent_context}/{identifier}" if parent_context else identifier + + # Log Extra Fields: Iterate through keys in actual_item. + # If a key is in actual_item but not in expected_item (and is not a list container like "assets" or "files"), + # log this as an informational message. + for key in actual_item.keys(): + if key not in expected_item and key not in ["assets", "files"]: + logger.debug(f"Field '{key}' present in actual {item_type_name} ({current_context}) but not specified in expected. Value: '{actual_item[key]}'") + + # Check Expected Fields: Iterate through keys in expected_item. + for key, expected_value in expected_item.items(): + if key not in actual_item: + logger.error(f"Missing expected field '{key}' in actual {item_type_name} ({current_context}).") + item_match = False + continue # Continue to check other fields in the expected_item + + actual_value = actual_item[key] + + if key == "assets": # List of AssetRule dictionaries + if not self._compare_list_of_rules(actual_value, expected_value, "AssetRule", current_context, "asset_name"): + item_match = False + elif key == "files": # List of FileRule dictionaries + if not self._compare_list_of_rules(actual_value, expected_value, "FileRule", current_context, "file_path"): + item_match = False + else: # Regular field comparison + if actual_value != expected_value: + # Handle None vs "None" string for preset_name specifically if it's a common issue + if key == "preset_name" and actual_value is None and expected_value == "None": + logger.debug(f"Field '{key}' in {item_type_name} ({current_context}): Actual is None, Expected is string \"None\". Treating as match for now.") + elif key == "target_asset_name_override" and actual_value is not None and expected_value is None: + # If actual has a value (e.g. parent asset name) and expected is null/None, + # this is a mismatch according to strict comparison. + # For a more lenient check, this logic could be adjusted here. + # Current strict comparison will flag this as error, which is what the logs show. + logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.") + item_match = False + else: + logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.") + item_match = False + + return item_match + + def _compare_list_of_rules(self, actual_list: List[Dict[str, Any]], expected_list: List[Dict[str, Any]], item_type_name: str, parent_context: str, item_key_field: str) -> bool: + """ + Compares a list of actual rule items against a list of expected rule items. + Items are matched by a key field (e.g., 'asset_name' or 'file_path'). + Order independent for matching, but logs count mismatches. + """ + list_match = True # Corrected indentation + if not isinstance(actual_list, list) or not isinstance(expected_list, list): + logger.error(f"Type mismatch for list of {item_type_name}s in {parent_context}. Expected lists.") + return False + + if len(actual_list) != len(expected_list): + logger.error(f"Mismatch in number of {item_type_name}s for {parent_context}. Actual: {len(actual_list)}, Expected: {len(expected_list)}.") + list_match = False # Count mismatch is an error + # If counts differ, we still try to match what we can to provide more detailed feedback, + # but the overall list_match will remain False. + + actual_items_map = {item.get(item_key_field): item for item in actual_list if item.get(item_key_field) is not None} + + # Keep track of expected items that found a match to identify missing ones more easily + matched_expected_keys = set() + + for expected_item in expected_list: + expected_key_value = expected_item.get(item_key_field) + if expected_key_value is None: + logger.error(f"Expected {item_type_name} in {parent_context} is missing key field '{item_key_field}'. Cannot compare this item: {expected_item}") + list_match = False # This specific expected item cannot be processed + continue + + actual_item = actual_items_map.get(expected_key_value) + if actual_item: + matched_expected_keys.add(expected_key_value) + if not self._compare_rule_item(actual_item, expected_item, item_type_name, parent_context): + list_match = False # Individual item comparison failed + else: + logger.error(f"Expected {item_type_name} with {item_key_field} '{expected_key_value}' not found in actual items for {parent_context}.") + list_match = False + + # Identify actual items that were not matched by any expected item + # This is useful if len(actual_list) >= len(expected_list) but some actual items are "extra" + for actual_key_value, actual_item_data in actual_items_map.items(): + if actual_key_value not in matched_expected_keys: + logger.debug(f"Extra actual {item_type_name} with {item_key_field} '{actual_key_value}' found in {parent_context} (not in expected list or already matched).") + if len(actual_list) != len(expected_list): # If counts already flagged a mismatch, this is just detail + pass + else: # Counts matched, but content didn't align perfectly by key + list_match = False + + + return list_match # Corrected indentation + + def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool: # Corrected structure: moved out + item_match = False + + return item_match + + def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool: + """ + Compares the actual rule data (converted from live SourceRule objects) + with the expected rule data (loaded from JSON). + """ + logger.debug("Comparing actual rules with expected rules...") + + actual_source_rules = actual_rules_data.get("source_rules", []) if actual_rules_data else [] + expected_source_rules = expected_rules_data.get("source_rules", []) if expected_rules_data else [] + + if not isinstance(actual_source_rules, list): + logger.error(f"Actual 'source_rules' is not a list. Found type: {type(actual_source_rules)}. Comparison aborted.") + return False # Cannot compare if actual data is malformed + if not isinstance(expected_source_rules, list): + logger.error(f"Expected 'source_rules' is not a list. Found type: {type(expected_source_rules)}. Test configuration error. Comparison aborted.") + return False # Test setup error + + if not expected_source_rules and not actual_source_rules: + logger.debug("Both expected and actual source rules lists are empty. Considered a match.") + return True + + if len(actual_source_rules) != len(expected_source_rules): + logger.error(f"Mismatch in the number of source rules. Actual: {len(actual_source_rules)}, Expected: {len(expected_source_rules)}.") + # Optionally, log more details about which list is longer/shorter or identifiers if available + return False + + overall_match_status = True + for i in range(len(expected_source_rules)): + actual_sr = actual_source_rules[i] + expected_sr = expected_source_rules[i] + + # For context, use input_path or an index + source_rule_context = expected_sr.get('input_path', f"SourceRule_index_{i}") + + if not self._compare_rule_item(actual_sr, expected_sr, "SourceRule", parent_context=source_rule_context): + overall_match_status = False + # Continue checking other source rules to log all discrepancies + + if overall_match_status: + logger.debug("All rules match the expected criteria.") # Covered by "Rule comparison successful" summary + else: + logger.warning("One or more rules did not match the expected criteria. See logs above for details.") + + return overall_match_status + + def _process_and_display_logs(self, logs_text: str) -> None: # logs_text is no longer the primary source for search + """ + Processes and displays logs, potentially filtering them if --search is used. + Also checks for tracebacks. + Sources logs from the in-memory handler for search and detailed analysis. + """ + logger.debug("--- Log Analysis ---") + global autotest_memory_handler # Access the global handler + log_records = [] + if autotest_memory_handler and autotest_memory_handler.buffer: + log_records = autotest_memory_handler.buffer + + formatted_log_lines = [] + # Define a consistent formatter, similar to what might be expected or useful for search + record_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s') + # Default asctime format includes milliseconds. + + + for record in log_records: + formatted_log_lines.append(record_formatter.format(record)) + + lines_for_search_and_traceback = formatted_log_lines + + if not lines_for_search_and_traceback: + logger.warning("No log records found in memory handler. No analysis to perform.") + # Still check the console logs_text for tracebacks if it exists, as a fallback + # or if some critical errors didn't make it to the memory handler (unlikely with DEBUG level) + if logs_text: + logger.debug("Checking provided logs_text (from console) for tracebacks as a fallback.") + console_lines = logs_text.splitlines() + traceback_found_console = False + for i, line in enumerate(console_lines): + if line.strip().startswith("Traceback (most recent call last):"): + logger.error(f"!!! TRACEBACK DETECTED in console logs_text around line {i+1} !!!") + traceback_found_console = True + if traceback_found_console: + logger.warning("A traceback was found in the console logs_text.") + else: + logger.info("No tracebacks found in the console logs_text either.") + logger.info("--- End Log Analysis ---") + return + + traceback_found = False + + if self.cli_args.search: + logger.info(f"Searching {len(lines_for_search_and_traceback)} in-memory log lines for term '{self.cli_args.search}' with {self.cli_args.additional_lines} context lines.") + matched_line_indices = [i for i, line in enumerate(lines_for_search_and_traceback) if self.cli_args.search in line] + + if not matched_line_indices: + logger.info(f"Search term '{self.cli_args.search}' not found in in-memory logs.") + else: + logger.info(f"Found {len(matched_line_indices)} match(es) for '{self.cli_args.search}' in in-memory logs:") + collected_lines_to_print = set() + for match_idx in matched_line_indices: + start_idx = max(0, match_idx - self.cli_args.additional_lines) + end_idx = min(len(lines_for_search_and_traceback), match_idx + self.cli_args.additional_lines + 1) + for i in range(start_idx, end_idx): + # Use i directly as index for lines_for_search_and_traceback, line number is for display + collected_lines_to_print.add(f"L{i+1:05d}: {lines_for_search_and_traceback[i]}") + + print("--- Filtered Log Output (from Memory Handler) ---") + for line_to_print in sorted(list(collected_lines_to_print)): + print(line_to_print) + print("--- End Filtered Log Output ---") + # Removed: else block that showed last N lines by default (as per original instruction for this section) + + # Traceback Check (on lines_for_search_and_traceback) + for i, line in enumerate(lines_for_search_and_traceback): + if line.strip().startswith("Traceback (most recent call last):") or "Traceback (most recent call last):" in line : # More robust check + logger.error(f"!!! TRACEBACK DETECTED in in-memory logs around line index {i} !!!") + logger.error(f"Line content: {line}") + traceback_found = True + + if traceback_found: + logger.warning("A traceback was found in the in-memory logs. This usually indicates a significant issue.") + else: + logger.info("No tracebacks found in the in-memory logs.") # This refers to the comprehensive memory logs + + logger.info("--- End Log Analysis ---") + + def cleanup_and_exit(self, success: bool = True) -> None: + """Cleans up and exits the application.""" + global autotest_memory_handler + if autotest_memory_handler: + logger.debug("Clearing memory log handler buffer and removing handler.") + autotest_memory_handler.buffer = [] # Clear buffer + logging.getLogger().removeHandler(autotest_memory_handler) # Remove handler + autotest_memory_handler.close() # MemoryHandler close is a no-op but good practice + autotest_memory_handler = None + + logger.info(f"Test {'succeeded' if success else 'failed'}. Cleaning up and exiting...") # KEEP INFO - Passes filter + q_app = QCoreApplication.instance() + if q_app: + q_app.quit() + sys.exit(0 if success else 1) + +# --- Main Execution --- +def main(): + """Main function to run the autotest script.""" + cli_args = parse_arguments() + # Logger is configured above, this will now use the new filtered setup + logger.info(f"Parsed CLI arguments: {cli_args}") # KEEP INFO - Passes filter + + # Clean and ensure output directory exists + output_dir_path = Path(cli_args.outputdir) + logger.debug(f"Preparing output directory: {output_dir_path}") + try: + if output_dir_path.exists(): + logger.debug(f"Output directory {output_dir_path} exists. Cleaning its contents...") + for item in output_dir_path.iterdir(): + if item.is_dir(): + shutil.rmtree(item) + logger.debug(f"Removed directory: {item}") + else: + item.unlink() + logger.debug(f"Removed file: {item}") + logger.debug(f"Contents of {output_dir_path} cleaned.") + else: + logger.debug(f"Output directory {output_dir_path} does not exist. Creating it.") + + output_dir_path.mkdir(parents=True, exist_ok=True) # Ensure it exists after cleaning/if it didn't exist + logger.debug(f"Output directory {output_dir_path} is ready.") + + except Exception as e: + logger.error(f"Could not prepare output directory {output_dir_path}: {e}", exc_info=True) + sys.exit(1) + + # Initialize QApplication + # Use QCoreApplication if no GUI elements are directly interacted with by the test logic itself, + # but QApplication is needed if MainWindow or its widgets are constructed and used. + # Since MainWindow is instantiated by App, QApplication is appropriate. + q_app = QApplication.instance() + if not q_app: + q_app = QApplication(sys.argv) + if not q_app: # Still no app + logger.error("Failed to initialize QApplication.") + sys.exit(1) + + logger.debug("Initializing main.App()...") + try: + # Instantiate main.App() - this should create MainWindow but not show it by default + # if App is designed to not show GUI unless app.main_window.show() is called. + app_instance = App() + except Exception as e: + logger.error(f"Failed to initialize main.App: {e}", exc_info=True) + sys.exit(1) + + if not app_instance.main_window: + logger.error("main.App initialized, but main_window is None. Cannot proceed with test.") + sys.exit(1) + + logger.debug("Initializing AutoTester...") + try: + tester = AutoTester(app_instance, cli_args) + except Exception as e: + logger.error(f"Failed to initialize AutoTester: {e}", exc_info=True) + sys.exit(1) + + # Use QTimer.singleShot to start the test after the Qt event loop has started. + # This ensures that the Qt environment is fully set up. + logger.debug("Scheduling test run...") + QTimer.singleShot(0, tester.run_test) + + logger.debug("Starting Qt application event loop...") + exit_code = q_app.exec() + logger.debug(f"Qt application event loop finished with exit code: {exit_code}") + sys.exit(exit_code) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/main.py b/main.py index 1cc864a..2e04852 100644 --- a/main.py +++ b/main.py @@ -401,11 +401,13 @@ class App(QObject): # --- Get paths needed for ProcessingTask --- try: - # Access output path via MainPanelWidget - output_base_path_str = self.main_window.main_panel_widget.output_path_edit.text().strip() + # Get output_dir from processing_settings passed from autotest.py + output_base_path_str = processing_settings.get("output_dir") + log.info(f"APP_DEBUG: Received output_dir in processing_settings: {output_base_path_str}") + if not output_base_path_str: - log.error("Cannot queue tasks: Output directory path is empty in the GUI.") - self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000) + log.error("Cannot queue tasks: Output directory path is empty in processing_settings.") + # self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000) # GUI specific return output_base_path = Path(output_base_path_str) # Basic validation - check if it's likely a valid path structure (doesn't guarantee existence/writability here) @@ -477,8 +479,9 @@ class App(QObject): engine=task_engine, rule=rule, workspace_path=workspace_path, - output_base_path=output_base_path + output_base_path=output_base_path # This is Path(output_base_path_str) ) + log.info(f"APP_DEBUG: Passing to ProcessingTask: output_base_path = {output_base_path}") task.signals.finished.connect(self._on_task_finished) log.debug(f"DEBUG: Calling thread_pool.start() for task {i+1}") self.thread_pool.start(task) diff --git a/processing/pipeline/stages/metadata_initialization.py b/processing/pipeline/stages/metadata_initialization.py index f938ff5..e77ef96 100644 --- a/processing/pipeline/stages/metadata_initialization.py +++ b/processing/pipeline/stages/metadata_initialization.py @@ -85,6 +85,7 @@ class MetadataInitializationStage(ProcessingStage): merged_maps_details. """ def execute(self, context: AssetProcessingContext) -> AssetProcessingContext: + logger.debug(f"METADATA_INIT_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Added """ Executes the metadata initialization logic. @@ -170,4 +171,5 @@ class MetadataInitializationStage(ProcessingStage): # Example of how you might log the full metadata for debugging: # logger.debug(f"Initialized metadata: {context.asset_metadata}") + logger.debug(f"METADATA_INIT_DEBUG: Exit - context.output_base_path = {context.output_base_path}") # Added return context \ No newline at end of file diff --git a/processing/pipeline/stages/output_organization.py b/processing/pipeline/stages/output_organization.py index 7a9d9d0..db92653 100644 --- a/processing/pipeline/stages/output_organization.py +++ b/processing/pipeline/stages/output_organization.py @@ -17,8 +17,16 @@ class OutputOrganizationStage(ProcessingStage): """ def execute(self, context: AssetProcessingContext) -> AssetProcessingContext: - log.info("OUTPUT_ORG: Stage execution started for asset '%s'", context.asset_rule.asset_name) - log.info(f"OUTPUT_ORG: context.processed_maps_details at start: {context.processed_maps_details}") + asset_name_for_log_early = context.asset_rule.asset_name if hasattr(context, 'asset_rule') and context.asset_rule else "Unknown Asset (early)" + log.info(f"OUTPUT_ORG_DEBUG: Stage execution started for asset '{asset_name_for_log_early}'.") + logger.debug(f"OUTPUT_ORG_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Modified + log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj.output_directory_base (raw from config) = {getattr(context.config_obj, 'output_directory_base', 'N/A')}") + # resolved_base = "N/A" + # if hasattr(context.config_obj, '_settings') and context.config_obj._settings.get('OUTPUT_BASE_DIR'): + # base_dir_from_settings = context.config_obj._settings.get('OUTPUT_BASE_DIR') + # Path resolution logic might be complex + # log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj._settings.OUTPUT_BASE_DIR (resolved guess) = {resolved_base}") + log.info(f"OUTPUT_ORG_DEBUG: context.processed_maps_details at start: {context.processed_maps_details}") """ Copies temporary processed and merged files to their final output locations based on path patterns and updates AssetProcessingContext. @@ -103,7 +111,9 @@ class OutputOrganizationStage(ProcessingStage): pattern_string=output_dir_pattern, token_data=token_data_variant_cleaned ) + logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Using context.output_base_path = {context.output_base_path} for final_variant_path construction.") # Added final_variant_path = Path(context.output_base_path) / Path(relative_dir_path_str_variant) / Path(output_filename_variant) + logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Constructed final_variant_path = {final_variant_path}") # Added final_variant_path.parent.mkdir(parents=True, exist_ok=True) if final_variant_path.exists() and not overwrite_existing: @@ -169,7 +179,9 @@ class OutputOrganizationStage(ProcessingStage): pattern_string=output_dir_pattern, token_data=token_data_cleaned ) + logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Using context.output_base_path = {context.output_base_path} for final_path construction.") # Added final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename) + logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Constructed final_path = {final_path}") # Added final_path.parent.mkdir(parents=True, exist_ok=True) if final_path.exists() and not overwrite_existing: @@ -245,10 +257,12 @@ class OutputOrganizationStage(ProcessingStage): token_data=base_token_data_cleaned ) # Destination: /// + logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Using context.output_base_path = {context.output_base_path} for final_dest_path construction.") # Added final_dest_path = (Path(context.output_base_path) / Path(asset_base_output_dir_str) / Path(extra_subdir_name) / source_file_path.name) # Use original filename + logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Constructed final_dest_path = {final_dest_path}") # Added final_dest_path.parent.mkdir(parents=True, exist_ok=True)