Merge branch 'Dev' into GUI-and-Configs
This commit is contained in:
commit
dfe6500141
4
.gitignore
vendored
4
.gitignore
vendored
@ -30,6 +30,6 @@ Thumbs.db
|
||||
gui/__pycache__
|
||||
__pycache__
|
||||
|
||||
Testfiles
|
||||
Testfiles/
|
||||
|
||||
Testfiles/TestOutputs
|
||||
Testfiles_
|
||||
|
||||
112
AUTOTEST_GUI_PLAN.md
Normal file
112
AUTOTEST_GUI_PLAN.md
Normal file
@ -0,0 +1,112 @@
|
||||
# Plan for Autotest GUI Mode Implementation
|
||||
|
||||
**I. Objective:**
|
||||
Create an `autotest.py` script that can launch the Asset Processor GUI headlessly, load a predefined asset (`.zip`), select a predefined preset, verify the predicted rule structure against an expected JSON, trigger processing to a predefined output directory, check the output, and analyze logs for errors or specific messages. This serves as a sanity check for core GUI-driven workflows.
|
||||
|
||||
**II. `TestFiles` Directory:**
|
||||
A new directory named `TestFiles` will be created in the project root (`c:/Users/Theis/Assetprocessor/Asset-Frameworker/TestFiles/`). This directory will house:
|
||||
* Sample asset `.zip` files for testing (e.g., `TestFiles/SampleAsset1.zip`).
|
||||
* Expected rule structure JSON files (e.g., `TestFiles/SampleAsset1_PresetX_expected_rules.json`).
|
||||
* A subdirectory for test outputs (e.g., `TestFiles/TestOutputs/`).
|
||||
|
||||
**III. `autotest.py` Script:**
|
||||
|
||||
1. **Location:** `c:/Users/Theis/Assetprocessor/Asset-Frameworker/autotest.py` (or `scripts/autotest.py`).
|
||||
2. **Command-Line Arguments (with defaults pointing to `TestFiles/`):**
|
||||
* `--zipfile`: Path to the test asset. Default: `TestFiles/default_test_asset.zip`.
|
||||
* `--preset`: Name of the preset. Default: `DefaultTestPreset`.
|
||||
* `--expectedrules`: Path to expected rules JSON. Default: `TestFiles/default_test_asset_rules.json`.
|
||||
* `--outputdir`: Path for processing output. Default: `TestFiles/TestOutputs/DefaultTestOutput`.
|
||||
* `--search` (optional): Log search term. Default: `None`.
|
||||
* `--additional-lines` (optional): Context lines for log search. Default: `0`.
|
||||
3. **Core Structure:**
|
||||
* Imports necessary modules from the main application and PySide6.
|
||||
* Adds project root to `sys.path` for imports.
|
||||
* `AutoTester` class:
|
||||
* **`__init__(self, app_instance: App)`:**
|
||||
* Stores `app_instance` and `main_window`.
|
||||
* Initializes `QEventLoop`.
|
||||
* Connects `app_instance.all_tasks_finished` to `self._on_all_tasks_finished`.
|
||||
* Loads expected rules from the `--expectedrules` file.
|
||||
* **`run_test(self)`:** Orchestrates the test steps sequentially:
|
||||
1. Load ZIP (`main_window.add_input_paths()`).
|
||||
2. Select Preset (`main_window.preset_editor_widget.editor_preset_list.setCurrentItem()`).
|
||||
3. Await Prediction (using `QTimer` to poll `main_window._pending_predictions`, manage with `QEventLoop`).
|
||||
4. Retrieve & Compare Rulelist:
|
||||
* Get actual rules: `main_window.unified_model.get_all_source_rules()`.
|
||||
* Convert actual rules to comparable dict (`_convert_rules_to_comparable()`).
|
||||
* Compare with loaded expected rules (`_compare_rules()`). If mismatch, log and fail.
|
||||
5. Start Processing (emit `main_window.start_backend_processing` with rules and output settings).
|
||||
6. Await Processing (use `QEventLoop` waiting for `_on_all_tasks_finished`).
|
||||
7. Check Output Path (verify existence of output dir, list contents, basic sanity checks like non-emptiness or presence of key asset folders).
|
||||
8. Retrieve & Analyze Logs (`main_window.log_console.log_console_output.toPlainText()`, filter by `--search`, check for tracebacks).
|
||||
9. Report result and call `cleanup_and_exit()`.
|
||||
* **`_check_prediction_status(self)`:** Slot for prediction polling timer.
|
||||
* **`_on_all_tasks_finished(self, processed_count, skipped_count, failed_count)`:** Slot for `App.all_tasks_finished` signal.
|
||||
* **`_convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> dict`:** Converts `SourceRule` objects to the JSON structure defined below.
|
||||
* **`_compare_rules(self, actual_rules_data: dict, expected_rules_data: dict) -> bool`:** Implements Option 1 comparison logic:
|
||||
* Errors if an expected field is missing or its value mismatches.
|
||||
* Logs (but doesn't error on) fields present in actual but not in expected.
|
||||
* **`_process_and_display_logs(self, logs_text: str)`:** Handles log filtering/display.
|
||||
* **`cleanup_and_exit(self, success=True)`:** Quits `QCoreApplication` and `sys.exit()`.
|
||||
* `main()` function:
|
||||
* Parses CLI arguments.
|
||||
* Initializes `QApplication`.
|
||||
* Instantiates `main.App()` (does *not* show the GUI).
|
||||
* Instantiates `AutoTester(app_instance)`.
|
||||
* Uses `QTimer.singleShot(0, tester.run_test)` to start the test.
|
||||
* Runs `q_app.exec()`.
|
||||
|
||||
**IV. `expected_rules.json` Structure (Revised):**
|
||||
Located in `TestFiles/`. Example: `TestFiles/SampleAsset1_PresetX_expected_rules.json`.
|
||||
```json
|
||||
{
|
||||
"source_rules": [
|
||||
{
|
||||
"input_path": "SampleAsset1.zip",
|
||||
"supplier_identifier": "ExpectedSupplier",
|
||||
"preset_name": "PresetX",
|
||||
"assets": [
|
||||
{
|
||||
"asset_name": "AssetNameFromPrediction",
|
||||
"asset_type": "Prop",
|
||||
"files": [
|
||||
{
|
||||
"file_path": "relative/path/to/file1.png",
|
||||
"item_type": "MAP_COL",
|
||||
"target_asset_name_override": null
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
**V. Mermaid Diagram of Autotest Flow:**
|
||||
```mermaid
|
||||
graph TD
|
||||
A[Start autotest.py with CLI Args (defaults to TestFiles/)] --> B{Setup Args & Logging};
|
||||
B --> C[Init QApplication & main.App (GUI Headless)];
|
||||
C --> D[Instantiate AutoTester(app_instance)];
|
||||
D --> E[QTimer.singleShot -> AutoTester.run_test()];
|
||||
|
||||
subgraph AutoTester.run_test()
|
||||
E --> F[Load Expected Rules from --expectedrules JSON];
|
||||
F --> G[Load ZIP (--zipfile) via main_window.add_input_paths()];
|
||||
G --> H[Select Preset (--preset) via main_window.preset_editor_widget];
|
||||
H --> I[Await Prediction (Poll main_window._pending_predictions via QTimer & QEventLoop)];
|
||||
I -- Prediction Done --> J[Get Actual Rules from main_window.unified_model];
|
||||
J --> K[Convert Actual Rules to Comparable JSON Structure];
|
||||
K --> L{Compare Actual vs Expected Rules (Option 1 Logic)};
|
||||
L -- Match --> M[Start Processing (Emit main_window.start_backend_processing with --outputdir)];
|
||||
L -- Mismatch --> ZFAIL[Log Mismatch & Call cleanup_and_exit(False)];
|
||||
M --> N[Await Processing (QEventLoop for App.all_tasks_finished signal)];
|
||||
N -- Processing Done --> O[Check Output Dir (--outputdir): Exists? Not Empty? Key Asset Folders?];
|
||||
O --> P[Retrieve & Analyze Logs (Search, Tracebacks)];
|
||||
P --> Q[Log Test Success & Call cleanup_and_exit(True)];
|
||||
end
|
||||
|
||||
ZFAIL --> ZEND[AutoTester.cleanup_and_exit() -> QCoreApplication.quit() & sys.exit()];
|
||||
Q --> ZEND;
|
||||
83
Documentation/01_User_Guide/11_Usage_Autotest.md
Normal file
83
Documentation/01_User_Guide/11_Usage_Autotest.md
Normal file
@ -0,0 +1,83 @@
|
||||
# User Guide: Usage - Automated GUI Testing (`autotest.py`)
|
||||
|
||||
This document explains how to use the `autotest.py` script for automated sanity checks of the Asset Processor Tool's GUI-driven workflow.
|
||||
|
||||
## Overview
|
||||
|
||||
The `autotest.py` script provides a way to run predefined test scenarios headlessly (without displaying the GUI). It simulates the core user actions: loading an asset, selecting a preset, allowing rules to be predicted, processing the asset, and then checks the results against expectations. This is primarily intended as a developer tool for regression testing and ensuring core functionality remains stable.
|
||||
|
||||
## Running the Autotest Script
|
||||
|
||||
From the project root directory, you can run the script using Python:
|
||||
|
||||
```bash
|
||||
python autotest.py [OPTIONS]
|
||||
```
|
||||
|
||||
### Command-Line Options
|
||||
|
||||
The script accepts several command-line arguments to configure the test run. If not provided, they use predefined default values.
|
||||
|
||||
* `--zipfile PATH_TO_ZIP`:
|
||||
* Specifies the path to the input asset `.zip` file to be used for the test.
|
||||
* Default: `TestFiles/BoucleChunky001.zip`
|
||||
* `--preset PRESET_NAME`:
|
||||
* Specifies the name of the preset to be selected and used for rule prediction and processing.
|
||||
* Default: `Dinesen`
|
||||
* `--expectedrules PATH_TO_JSON`:
|
||||
* Specifies the path to a JSON file containing the expected rule structure that should be generated after the preset is applied to the input asset.
|
||||
* Default: `TestFiles/test-BoucleChunky001.json`
|
||||
* `--outputdir PATH_TO_DIR`:
|
||||
* Specifies the directory where the processed assets will be written.
|
||||
* Default: `TestFiles/TestOutputs/DefaultTestOutput`
|
||||
* `--search "SEARCH_TERM"` (optional):
|
||||
* A string to search for within the application logs generated during the test run. If found, matching log lines (with context) will be highlighted.
|
||||
* Default: None
|
||||
* `--additional-lines NUM_LINES` (optional):
|
||||
* When using `--search`, this specifies how many lines of context before and after each matching log line should be displayed.
|
||||
* Default: `0`
|
||||
|
||||
**Example Usage:**
|
||||
|
||||
```bash
|
||||
# Run with default test files and settings
|
||||
python autotest.py
|
||||
|
||||
# Run with specific test files and search for a log message
|
||||
python autotest.py --zipfile TestFiles/MySpecificAsset.zip --preset MyPreset --expectedrules TestFiles/MySpecificAsset_rules.json --outputdir TestFiles/TestOutputs/MySpecificOutput --search "Processing complete for asset"
|
||||
```
|
||||
|
||||
## `TestFiles` Directory
|
||||
|
||||
The autotest script relies on a directory named `TestFiles` located in the project root. This directory should contain:
|
||||
|
||||
* **Test Asset `.zip` files:** The actual asset archives used as input for tests (e.g., `default_test_asset.zip`, `MySpecificAsset.zip`).
|
||||
* **Expected Rules `.json` files:** JSON files defining the expected rule structure for a given asset and preset combination (e.g., `default_test_asset_rules.json`, `MySpecificAsset_rules.json`). The structure of this file is detailed in the main autotest plan (`AUTOTEST_GUI_PLAN.md`).
|
||||
* **`TestOutputs/` subdirectory:** This is the default parent directory where the autotest script will create specific output folders for each test run (e.g., `TestFiles/TestOutputs/DefaultTestOutput/`).
|
||||
|
||||
## Test Workflow
|
||||
|
||||
When executed, `autotest.py` performs the following steps:
|
||||
|
||||
1. **Initialization:** Parses command-line arguments and initializes the main application components headlessly.
|
||||
2. **Load Expected Rules:** Loads the `expected_rules.json` file.
|
||||
3. **Load Asset:** Loads the specified `.zip` file into the application.
|
||||
4. **Select Preset:** Selects the specified preset. This triggers the internal rule prediction process.
|
||||
5. **Await Prediction:** Waits for the rule prediction to complete.
|
||||
6. **Compare Rules:** Retrieves the predicted rules from the application and compares them against the loaded expected rules. If there's a mismatch, the test typically fails at this point.
|
||||
7. **Start Processing:** If the rules match, it initiates the asset processing pipeline, directing output to the specified output directory.
|
||||
8. **Await Processing:** Waits for all backend processing tasks to complete.
|
||||
9. **Check Output:** Verifies the existence of the output directory and lists its contents. Basic checks ensure some output was generated.
|
||||
10. **Analyze Logs:** Retrieves logs from the application. If a search term was provided, it filters and displays relevant log portions. It also checks for Python tracebacks, which usually indicate a failure.
|
||||
11. **Report Result:** Prints a summary of the test outcome (success or failure) and exits with an appropriate status code (0 for success, 1 for failure).
|
||||
|
||||
## Interpreting Results
|
||||
|
||||
* **Console Output:** The script will log its progress and the results of each step to the console.
|
||||
* **Log Analysis:** Pay attention to the log output, especially if a `--search` term was used or if any tracebacks are reported.
|
||||
* **Exit Code:**
|
||||
* `0`: Test completed successfully.
|
||||
* `1`: Test failed at some point (e.g., rule mismatch, processing error, traceback found).
|
||||
* **Output Directory:** Inspect the contents of the specified output directory to manually verify the processed assets if needed.
|
||||
|
||||
This automated test helps ensure the stability of the core processing logic when driven by GUI-equivalent actions.
|
||||
BIN
TestFiles/BoucleChunky001.zip
Normal file
BIN
TestFiles/BoucleChunky001.zip
Normal file
Binary file not shown.
57
TestFiles/Test-BoucleChunky001.json
Normal file
57
TestFiles/Test-BoucleChunky001.json
Normal file
@ -0,0 +1,57 @@
|
||||
{
|
||||
"source_rules": [
|
||||
{
|
||||
"input_path": "BoucleChunky001.zip",
|
||||
"supplier_identifier": "Dinesen",
|
||||
"preset_name": null,
|
||||
"assets": [
|
||||
{
|
||||
"asset_name": "BoucleChunky001",
|
||||
"asset_type": "Surface",
|
||||
"files": [
|
||||
{
|
||||
"file_path": "BoucleChunky001_AO_1K_METALNESS.png",
|
||||
"item_type": "MAP_AO",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_COL_1K_METALNESS.png",
|
||||
"item_type": "MAP_COL",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_DISP16_1K_METALNESS.png",
|
||||
"item_type": "MAP_DISP",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_DISP_1K_METALNESS.png",
|
||||
"item_type": "MAP_DISP",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_Fabric.png",
|
||||
"item_type": "EXTRA",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_METALNESS_1K_METALNESS.png",
|
||||
"item_type": "MAP_METAL",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_NRM_1K_METALNESS.png",
|
||||
"item_type": "MAP_NRM",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
},
|
||||
{
|
||||
"file_path": "BoucleChunky001_ROUGHNESS_1K_METALNESS.png",
|
||||
"item_type": "MAP_ROUGH",
|
||||
"target_asset_name_override": "BoucleChunky001"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
863
autotest.py
Normal file
863
autotest.py
Normal file
@ -0,0 +1,863 @@
|
||||
import argparse
|
||||
import sys
|
||||
import logging
|
||||
import logging.handlers
|
||||
import time
|
||||
import json
|
||||
import shutil # Import shutil for directory operations
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from PySide6.QtCore import QCoreApplication, QTimer, Slot, QEventLoop, QObject, Signal
|
||||
from PySide6.QtWidgets import QApplication, QListWidgetItem
|
||||
|
||||
# Add project root to sys.path
|
||||
project_root = Path(__file__).resolve().parent
|
||||
if str(project_root) not in sys.path:
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
try:
|
||||
from main import App
|
||||
from gui.main_window import MainWindow
|
||||
from rule_structure import SourceRule # Assuming SourceRule is in rule_structure.py
|
||||
except ImportError as e:
|
||||
print(f"Error importing project modules: {e}")
|
||||
print(f"Ensure that the script is run from the project root or that the project root is in PYTHONPATH.")
|
||||
print(f"Current sys.path: {sys.path}")
|
||||
sys.exit(1)
|
||||
|
||||
# Global variable for the memory log handler
|
||||
autotest_memory_handler = None
|
||||
|
||||
# Custom Log Filter for Concise Output
|
||||
class InfoSummaryFilter(logging.Filter):
|
||||
# Keywords that identify INFO messages to *allow* for concise output
|
||||
SUMMARY_KEYWORDS_PRECISE = [
|
||||
"Test run completed",
|
||||
"Test succeeded",
|
||||
"Test failed",
|
||||
"Rule comparison successful",
|
||||
"Rule comparison failed",
|
||||
"ProcessingEngine finished. Summary:",
|
||||
"Autotest Context:",
|
||||
"Parsed CLI arguments:",
|
||||
"Prediction completed successfully.",
|
||||
"Processing completed.",
|
||||
"Signal 'all_tasks_finished' received",
|
||||
"final status:", # To catch "Asset '...' final status:"
|
||||
"User settings file not found:",
|
||||
"MainPanelWidget: Default output directory set to:",
|
||||
# Search related (as per original filter)
|
||||
"Searching logs for term",
|
||||
"Search term ",
|
||||
"Found ",
|
||||
"No tracebacks found in the logs.",
|
||||
"--- End Log Analysis ---",
|
||||
"Log analysis completed.",
|
||||
]
|
||||
# Patterns for case-insensitive rejection
|
||||
REJECT_PATTERNS_LOWER = [
|
||||
# Original debug prefixes (ensure these are still relevant or merge if needed)
|
||||
"debug:", "orchestrator_trace:", "configuration_debug:", "app_debug:", "output_org_debug:",
|
||||
# Iterative / Per-item / Per-file details / Intermediate steps
|
||||
": item ", # Catches "Asset '...', Item X/Y"
|
||||
"item successfully processed and saved",
|
||||
", file '", # Catches "Asset '...', File '...'"
|
||||
": processing regular map",
|
||||
": found source file:",
|
||||
": determined source bit depth:",
|
||||
"successfully processed regular map",
|
||||
"successfully created mergetaskdefinition",
|
||||
": preparing processing items",
|
||||
": finished preparing items. found",
|
||||
": starting core item processing loop",
|
||||
", task '",
|
||||
": processing merge task",
|
||||
"loaded from context:",
|
||||
"using dimensions from first loaded input",
|
||||
"successfully merged inputs into image",
|
||||
"successfully processed merge task",
|
||||
"mergedtaskprocessorstage result",
|
||||
"calling savevariantsstage",
|
||||
"savevariantsstage result",
|
||||
"adding final details to context",
|
||||
": finished core item processing loop",
|
||||
": copied variant",
|
||||
": copied extra file",
|
||||
": successfully organized",
|
||||
": output organization complete.",
|
||||
": metadata saved to",
|
||||
"worker thread: starting processing for rule:",
|
||||
"preparing workspace for input:",
|
||||
"input is a supported archive",
|
||||
"calling processingengine.process with rule",
|
||||
"calculated sha5 for",
|
||||
"calculated next incrementing value for",
|
||||
"verify: processingengine.process called",
|
||||
": effective supplier set to",
|
||||
": metadata initialized.",
|
||||
": file rules queued for processing",
|
||||
"successfully loaded base application settings",
|
||||
"successfully loaded and merged asset_type_definitions",
|
||||
"successfully loaded and merged file_type_definitions",
|
||||
"starting rule-based prediction for:",
|
||||
"rule-based prediction finished successfully for",
|
||||
"finished rule-based prediction run for",
|
||||
"updating model with rule-based results for source:",
|
||||
"debug task ",
|
||||
"worker thread: finished processing for rule:",
|
||||
"task finished signal received for",
|
||||
# Autotest step markers (not global summaries)
|
||||
"step 1: loading zip file:",
|
||||
"step 2: selecting preset:",
|
||||
"step 4: retrieving and comparing rules...",
|
||||
"step 5: starting processing...",
|
||||
"step 7: checking output path:",
|
||||
"output path check completed.",
|
||||
]
|
||||
|
||||
def filter(self, record):
|
||||
# Allow CRITICAL, ERROR, WARNING unconditionally
|
||||
if record.levelno >= logging.WARNING:
|
||||
return True
|
||||
|
||||
if record.levelno == logging.INFO:
|
||||
msg = record.getMessage()
|
||||
msg_lower = msg.lower() # For case-insensitive pattern rejection
|
||||
|
||||
# 1. Explicitly REJECT if message contains verbose patterns (case-insensitive)
|
||||
for pattern in self.REJECT_PATTERNS_LOWER: # Use the new list
|
||||
if pattern in msg_lower:
|
||||
return False # Reject
|
||||
|
||||
# 2. Then, if not rejected, ALLOW only if message contains precise summary keywords
|
||||
for keyword in self.SUMMARY_KEYWORDS_PRECISE: # Use the new list
|
||||
if keyword in msg: # Original message for case-sensitive summary keywords if needed
|
||||
return True # Allow
|
||||
|
||||
# 3. Reject all other INFO messages that don't match precise summary keywords
|
||||
return False
|
||||
|
||||
# Reject levels below INFO (e.g., DEBUG) by default for this handler
|
||||
return False
|
||||
|
||||
# --- Root Logger Configuration for Concise Console Output ---
|
||||
def setup_autotest_logging():
|
||||
"""
|
||||
Configures the root logger for concise console output for autotest.py.
|
||||
This ensures that only essential summary information, warnings, and errors
|
||||
are displayed on the console by default.
|
||||
"""
|
||||
root_logger = logging.getLogger()
|
||||
|
||||
# 1. Remove all existing handlers from the root logger.
|
||||
# This prevents interference from other logging configurations.
|
||||
for handler in root_logger.handlers[:]:
|
||||
root_logger.removeHandler(handler)
|
||||
handler.close() # Close handler before removing
|
||||
|
||||
# 2. Set the root logger's level to DEBUG to capture everything for the memory handler.
|
||||
# The console handler will still filter down to INFO/selected.
|
||||
root_logger.setLevel(logging.DEBUG) # Changed from INFO to DEBUG
|
||||
|
||||
# 3. Create a new StreamHandler for sys.stdout (for concise console output).
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
|
||||
# 4. Set this console handler's level to INFO.
|
||||
# The filter will then decide which INFO messages to display on console.
|
||||
console_handler.setLevel(logging.INFO)
|
||||
|
||||
# 5. Apply the enhanced InfoSummaryFilter to the console handler.
|
||||
info_filter = InfoSummaryFilter()
|
||||
console_handler.addFilter(info_filter)
|
||||
|
||||
# 6. Set a concise formatter for the console handler.
|
||||
formatter = logging.Formatter('[%(levelname)s] %(message)s')
|
||||
console_handler.setFormatter(formatter)
|
||||
|
||||
# 7. Add this newly configured console handler to the root_logger.
|
||||
root_logger.addHandler(console_handler)
|
||||
|
||||
# 8. Setup the MemoryHandler
|
||||
global autotest_memory_handler # Declare usage of global
|
||||
autotest_memory_handler = logging.handlers.MemoryHandler(
|
||||
capacity=20000, # Increased capacity
|
||||
flushLevel=logging.CRITICAL + 1, # Prevent automatic flushing
|
||||
target=None # Does not flush to another handler
|
||||
)
|
||||
autotest_memory_handler.setLevel(logging.DEBUG) # Capture all logs from DEBUG up
|
||||
# Not adding a formatter here, will format in _process_and_display_logs
|
||||
|
||||
# 9. Add the memory handler to the root logger.
|
||||
root_logger.addHandler(autotest_memory_handler)
|
||||
|
||||
# Call the setup function early in the script's execution.
|
||||
setup_autotest_logging()
|
||||
|
||||
# Logger for autotest.py's own messages.
|
||||
# Messages from this logger will propagate to the root logger and be filtered
|
||||
# by the console_handler configured above.
|
||||
# Setting its level to DEBUG allows autotest.py to generate DEBUG messages,
|
||||
# which won't appear on the concise console (due to handler's INFO level)
|
||||
# but can be captured by other handlers (e.g., the GUI's log console).
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG) # Ensure autotest.py can generate DEBUGs for other handlers
|
||||
|
||||
# Note: The GUI's log console (e.g., self.main_window.log_console.log_console_output)
|
||||
# is assumed to capture all logs (including DEBUG) from various modules.
|
||||
# The _process_and_display_logs function then uses these comprehensive logs for the --search feature.
|
||||
# This root logger setup primarily makes autotest.py's direct console output concise,
|
||||
# ensuring that only filtered, high-level information appears on stdout by default.
|
||||
# --- End of Root Logger Configuration ---
|
||||
|
||||
# --- Argument Parsing ---
|
||||
def parse_arguments():
|
||||
"""Parses command-line arguments for the autotest script."""
|
||||
parser = argparse.ArgumentParser(description="Automated test script for Asset Processor GUI.")
|
||||
parser.add_argument(
|
||||
"--zipfile",
|
||||
type=Path,
|
||||
default=project_root / "TestFiles" / "BoucleChunky001.zip",
|
||||
help="Path to the test asset ZIP file. Default: TestFiles/BoucleChunky001.zip"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--preset",
|
||||
type=str,
|
||||
default="Dinesen", # This should match a preset name in the application
|
||||
help="Name of the preset to use. Default: Dinesen"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--expectedrules",
|
||||
type=Path,
|
||||
default=project_root / "TestFiles" / "Test-BoucleChunky001.json",
|
||||
help="Path to the JSON file with expected rules. Default: TestFiles/Test-BoucleChunky001.json"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--outputdir",
|
||||
type=Path,
|
||||
default=project_root / "TestFiles" / "TestOutputs" / "BoucleChunkyOutput",
|
||||
help="Path for processing output. Default: TestFiles/TestOutputs/BoucleChunkyOutput"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--search",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Optional log search term. Default: None"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--additional-lines",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Context lines for log search. Default: 0"
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
class AutoTester(QObject):
|
||||
"""
|
||||
Handles the automated testing process for the Asset Processor GUI.
|
||||
"""
|
||||
# Define signals if needed, e.g., for specific test events
|
||||
# test_step_completed = Signal(str)
|
||||
|
||||
def __init__(self, app_instance: App, cli_args: argparse.Namespace):
|
||||
super().__init__()
|
||||
self.app_instance: App = app_instance
|
||||
self.main_window: MainWindow = app_instance.main_window
|
||||
self.cli_args: argparse.Namespace = cli_args
|
||||
self.event_loop = QEventLoop(self)
|
||||
self.prediction_poll_timer = QTimer(self)
|
||||
self.expected_rules_data: Dict[str, Any] = {}
|
||||
self.test_step: str = "INIT" # Possible values: INIT, LOADING_ZIP, SELECTING_PRESET, AWAITING_PREDICTION, PREDICTION_COMPLETE, COMPARING_RULES, STARTING_PROCESSING, AWAITING_PROCESSING, PROCESSING_COMPLETE, CHECKING_OUTPUT, ANALYZING_LOGS, DONE
|
||||
|
||||
if not self.main_window:
|
||||
logger.error("MainWindow instance not found in App. Cannot proceed.")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
# Connect signals
|
||||
if hasattr(self.app_instance, 'all_tasks_finished') and isinstance(self.app_instance.all_tasks_finished, Signal):
|
||||
self.app_instance.all_tasks_finished.connect(self._on_all_tasks_finished)
|
||||
else:
|
||||
logger.warning("App instance does not have 'all_tasks_finished' signal or it's not a Signal. Processing completion might not be detected.")
|
||||
|
||||
self._load_expected_rules()
|
||||
|
||||
def _load_expected_rules(self) -> None:
|
||||
"""Loads the expected rules from the JSON file specified by cli_args."""
|
||||
self.test_step = "LOADING_EXPECTED_RULES"
|
||||
logger.debug(f"Loading expected rules from: {self.cli_args.expectedrules}")
|
||||
try:
|
||||
with open(self.cli_args.expectedrules, 'r') as f:
|
||||
self.expected_rules_data = json.load(f)
|
||||
logger.debug("Expected rules loaded successfully.")
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Expected rules file not found: {self.cli_args.expectedrules}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error decoding expected rules JSON: {e}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred while loading expected rules: {e}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
|
||||
def run_test(self) -> None:
|
||||
"""Orchestrates the test steps."""
|
||||
logger.info("Starting test run...")
|
||||
|
||||
if not self.expected_rules_data: # Ensure rules were loaded
|
||||
logger.error("Expected rules not loaded. Aborting test.")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
# Add a specific summary log for essential context
|
||||
logger.info(f"Autotest Context: Input='{self.cli_args.zipfile.name}', Preset='{self.cli_args.preset}', Output='{self.cli_args.outputdir}'")
|
||||
|
||||
# Step 1: Load ZIP
|
||||
self.test_step = "LOADING_ZIP"
|
||||
logger.info(f"Step 1: Loading ZIP file: {self.cli_args.zipfile}") # KEEP INFO - Passes filter
|
||||
if not self.cli_args.zipfile.exists():
|
||||
logger.error(f"ZIP file not found: {self.cli_args.zipfile}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
try:
|
||||
# Assuming add_input_paths can take a list of strings or Path objects
|
||||
self.main_window.add_input_paths([str(self.cli_args.zipfile)])
|
||||
logger.debug("ZIP file loading initiated.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error during ZIP file loading: {e}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
# Step 2: Select Preset
|
||||
self.test_step = "SELECTING_PRESET"
|
||||
logger.info(f"Step 2: Selecting preset: {self.cli_args.preset}") # KEEP INFO - Passes filter
|
||||
preset_found = False
|
||||
preset_list_widget = self.main_window.preset_editor_widget.editor_preset_list
|
||||
for i in range(preset_list_widget.count()):
|
||||
item = preset_list_widget.item(i)
|
||||
if item and item.text() == self.cli_args.preset:
|
||||
preset_list_widget.setCurrentItem(item)
|
||||
logger.debug(f"Preset '{self.cli_args.preset}' selected.")
|
||||
preset_found = True
|
||||
break
|
||||
if not preset_found:
|
||||
logger.error(f"Preset '{self.cli_args.preset}' not found in the list.")
|
||||
available_presets = [preset_list_widget.item(i).text() for i in range(preset_list_widget.count())]
|
||||
logger.debug(f"Available presets: {available_presets}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
# Step 3: Await Prediction Completion
|
||||
self.test_step = "AWAITING_PREDICTION"
|
||||
logger.debug("Step 3: Awaiting prediction completion...")
|
||||
self.prediction_poll_timer.timeout.connect(self._check_prediction_status)
|
||||
self.prediction_poll_timer.start(500) # Poll every 500ms
|
||||
|
||||
# Use a QTimer to allow event loop to process while waiting for this step
|
||||
# This ensures that the _check_prediction_status can be called.
|
||||
# We will exit this event_loop from _check_prediction_status when prediction is done.
|
||||
logger.debug("Starting event loop for prediction...")
|
||||
self.event_loop.exec() # This loop is quit by _check_prediction_status
|
||||
self.prediction_poll_timer.stop()
|
||||
logger.debug("Event loop for prediction finished.")
|
||||
|
||||
|
||||
if self.test_step != "PREDICTION_COMPLETE":
|
||||
logger.error(f"Prediction did not complete as expected. Current step: {self.test_step}")
|
||||
# Check if there were any pending predictions that never cleared
|
||||
if hasattr(self.main_window, '_pending_predictions'):
|
||||
logger.error(f"Pending predictions at timeout: {self.main_window._pending_predictions}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
logger.info("Prediction completed successfully.") # KEEP INFO - Passes filter
|
||||
|
||||
# Step 4: Retrieve & Compare Rulelist
|
||||
self.test_step = "COMPARING_RULES"
|
||||
logger.info("Step 4: Retrieving and Comparing Rules...") # KEEP INFO - Passes filter
|
||||
actual_source_rules_list: List[SourceRule] = self.main_window.unified_model.get_all_source_rules()
|
||||
actual_rules_obj = actual_source_rules_list # Keep the SourceRule list for processing
|
||||
|
||||
comparable_actual_rules = self._convert_rules_to_comparable(actual_source_rules_list)
|
||||
|
||||
if not self._compare_rules(comparable_actual_rules, self.expected_rules_data):
|
||||
logger.error("Rule comparison failed. See logs for details.")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
logger.info("Rule comparison successful.") # KEEP INFO - Passes filter
|
||||
|
||||
# Step 5: Start Processing
|
||||
self.test_step = "START_PROCESSING"
|
||||
logger.info("Step 5: Starting Processing...") # KEEP INFO - Passes filter
|
||||
processing_settings = {
|
||||
"output_dir": str(self.cli_args.outputdir), # Ensure it's a string for JSON/config
|
||||
"overwrite": True,
|
||||
"workers": 1,
|
||||
"blender_enabled": False # Basic test, no Blender
|
||||
}
|
||||
try:
|
||||
Path(self.cli_args.outputdir).mkdir(parents=True, exist_ok=True)
|
||||
logger.debug(f"Ensured output directory exists: {self.cli_args.outputdir}")
|
||||
except Exception as e:
|
||||
logger.error(f"Could not create output directory {self.cli_args.outputdir}: {e}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
if hasattr(self.main_window, 'start_backend_processing') and isinstance(self.main_window.start_backend_processing, Signal):
|
||||
logger.debug(f"Emitting start_backend_processing with rules count: {len(actual_rules_obj)} and settings: {processing_settings}")
|
||||
self.main_window.start_backend_processing.emit(actual_rules_obj, processing_settings)
|
||||
else:
|
||||
logger.error("'start_backend_processing' signal not found on MainWindow. Cannot start processing.")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
# Step 6: Await Processing Completion
|
||||
self.test_step = "AWAIT_PROCESSING"
|
||||
logger.debug("Step 6: Awaiting processing completion...")
|
||||
self.event_loop.exec() # This loop is quit by _on_all_tasks_finished
|
||||
|
||||
if self.test_step != "PROCESSING_COMPLETE":
|
||||
logger.error(f"Processing did not complete as expected. Current step: {self.test_step}")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
logger.info("Processing completed.") # KEEP INFO - Passes filter
|
||||
|
||||
# Step 7: Check Output Path
|
||||
self.test_step = "CHECK_OUTPUT"
|
||||
logger.info(f"Step 7: Checking output path: {self.cli_args.outputdir}") # KEEP INFO - Passes filter
|
||||
output_path = Path(self.cli_args.outputdir)
|
||||
if not output_path.exists() or not output_path.is_dir():
|
||||
logger.error(f"Output directory {output_path} does not exist or is not a directory.")
|
||||
self.cleanup_and_exit(success=False)
|
||||
return
|
||||
|
||||
output_items = list(output_path.iterdir())
|
||||
if not output_items:
|
||||
logger.warning(f"Output directory {output_path} is empty. This might be a test failure depending on the case.")
|
||||
# For a more specific check, one might iterate through actual_rules_obj
|
||||
# and verify if subdirectories matching asset_name exist.
|
||||
# e.g. for asset_rule in source_rule.assets:
|
||||
# expected_asset_dir = output_path / asset_rule.asset_name
|
||||
# if not expected_asset_dir.is_dir(): logger.error(...)
|
||||
else:
|
||||
logger.debug(f"Found {len(output_items)} item(s) in output directory:")
|
||||
for item in output_items:
|
||||
logger.debug(f" - {item.name} ({'dir' if item.is_dir() else 'file'})")
|
||||
logger.info("Output path check completed.") # KEEP INFO - Passes filter
|
||||
|
||||
# Step 8: Retrieve & Analyze Logs
|
||||
self.test_step = "CHECK_LOGS"
|
||||
logger.debug("Step 8: Retrieving and Analyzing Logs...")
|
||||
all_logs_text = ""
|
||||
if self.main_window.log_console and self.main_window.log_console.log_console_output:
|
||||
all_logs_text = self.main_window.log_console.log_console_output.toPlainText()
|
||||
else:
|
||||
logger.warning("Log console or output widget not found. Cannot retrieve logs.")
|
||||
|
||||
self._process_and_display_logs(all_logs_text)
|
||||
logger.info("Log analysis completed.")
|
||||
|
||||
# Final Step
|
||||
logger.info("Test run completed successfully.") # KEEP INFO - Passes filter
|
||||
self.cleanup_and_exit(success=True)
|
||||
|
||||
@Slot()
|
||||
def _check_prediction_status(self) -> None:
|
||||
"""Polls the main window for pending predictions."""
|
||||
# logger.debug(f"Checking prediction status. Pending: {self.main_window._pending_predictions if hasattr(self.main_window, '_pending_predictions') else 'N/A'}")
|
||||
if hasattr(self.main_window, '_pending_predictions'):
|
||||
if not self.main_window._pending_predictions: # Assuming _pending_predictions is a list/dict that's empty when done
|
||||
logger.debug("No pending predictions. Prediction assumed complete.")
|
||||
self.test_step = "PREDICTION_COMPLETE"
|
||||
if self.event_loop.isRunning():
|
||||
self.event_loop.quit()
|
||||
# else:
|
||||
# logger.debug(f"Still awaiting predictions: {len(self.main_window._pending_predictions)} remaining.")
|
||||
else:
|
||||
logger.warning("'_pending_predictions' attribute not found on MainWindow. Cannot check prediction status automatically.")
|
||||
# As a fallback, if the attribute is missing, we might assume prediction is instant or needs manual check.
|
||||
# For now, let's assume it means it's done if the attribute is missing, but this is risky.
|
||||
# A better approach would be to have a clear signal from MainWindow when predictions are done.
|
||||
self.test_step = "PREDICTION_COMPLETE" # Risky assumption
|
||||
if self.event_loop.isRunning():
|
||||
self.event_loop.quit()
|
||||
|
||||
|
||||
@Slot(int, int, int)
|
||||
def _on_all_tasks_finished(self, processed_count: int, skipped_count: int, failed_count: int) -> None:
|
||||
"""Slot for App.all_tasks_finished signal."""
|
||||
logger.info(f"Signal 'all_tasks_finished' received: Processed={processed_count}, Skipped={skipped_count}, Failed={failed_count}") # KEEP INFO - Passes filter
|
||||
|
||||
if self.test_step == "AWAIT_PROCESSING":
|
||||
logger.debug("Processing completion signal received.") # Covered by the summary log above
|
||||
if failed_count > 0:
|
||||
logger.error(f"Processing finished with {failed_count} failed task(s).")
|
||||
# Even if tasks failed, the test might pass based on output checks.
|
||||
# The error is logged for information.
|
||||
self.test_step = "PROCESSING_COMPLETE"
|
||||
if self.event_loop.isRunning():
|
||||
self.event_loop.quit()
|
||||
else:
|
||||
logger.warning(f"Signal 'all_tasks_finished' received at an unexpected test step: '{self.test_step}'. Counts: P={processed_count}, S={skipped_count}, F={failed_count}")
|
||||
|
||||
|
||||
def _convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> Dict[str, Any]:
|
||||
"""
|
||||
Converts a list of SourceRule objects to a dictionary structure
|
||||
suitable for comparison with the expected_rules.json.
|
||||
"""
|
||||
logger.debug(f"Converting {len(source_rules_list)} SourceRule objects to comparable dictionary...")
|
||||
comparable_sources_list = []
|
||||
for source_rule_obj in source_rules_list:
|
||||
comparable_asset_list = []
|
||||
# source_rule_obj.assets is List[AssetRule]
|
||||
for asset_rule_obj in source_rule_obj.assets:
|
||||
comparable_file_list = []
|
||||
# asset_rule_obj.files is List[FileRule]
|
||||
for file_rule_obj in asset_rule_obj.files:
|
||||
comparable_file_list.append({
|
||||
"file_path": file_rule_obj.file_path,
|
||||
"item_type": file_rule_obj.item_type,
|
||||
"target_asset_name_override": file_rule_obj.target_asset_name_override
|
||||
})
|
||||
comparable_asset_list.append({
|
||||
"asset_name": asset_rule_obj.asset_name,
|
||||
"asset_type": asset_rule_obj.asset_type,
|
||||
"files": comparable_file_list
|
||||
})
|
||||
comparable_sources_list.append({
|
||||
"input_path": Path(source_rule_obj.input_path).name, # Use only the filename
|
||||
"supplier_identifier": source_rule_obj.supplier_identifier,
|
||||
"preset_name": source_rule_obj.preset_name,
|
||||
"assets": comparable_asset_list
|
||||
})
|
||||
logger.debug("Conversion to comparable dictionary finished.")
|
||||
return {"source_rules": comparable_sources_list}
|
||||
|
||||
def _compare_rule_item(self, actual_item: Dict[str, Any], expected_item: Dict[str, Any], item_type_name: str, parent_context: str = "") -> bool:
|
||||
"""
|
||||
Recursively compares an individual actual rule item dictionary with an expected rule item dictionary.
|
||||
Logs differences and returns True if they match, False otherwise.
|
||||
"""
|
||||
item_match = True
|
||||
|
||||
identifier = ""
|
||||
if item_type_name == "SourceRule":
|
||||
identifier = expected_item.get('input_path', f'UnknownSource_at_{parent_context}')
|
||||
elif item_type_name == "AssetRule":
|
||||
identifier = expected_item.get('asset_name', f'UnknownAsset_at_{parent_context}')
|
||||
elif item_type_name == "FileRule":
|
||||
identifier = expected_item.get('file_path', f'UnknownFile_at_{parent_context}')
|
||||
|
||||
current_context = f"{parent_context}/{identifier}" if parent_context else identifier
|
||||
|
||||
# Log Extra Fields: Iterate through keys in actual_item.
|
||||
# If a key is in actual_item but not in expected_item (and is not a list container like "assets" or "files"),
|
||||
# log this as an informational message.
|
||||
for key in actual_item.keys():
|
||||
if key not in expected_item and key not in ["assets", "files"]:
|
||||
logger.debug(f"Field '{key}' present in actual {item_type_name} ({current_context}) but not specified in expected. Value: '{actual_item[key]}'")
|
||||
|
||||
# Check Expected Fields: Iterate through keys in expected_item.
|
||||
for key, expected_value in expected_item.items():
|
||||
if key not in actual_item:
|
||||
logger.error(f"Missing expected field '{key}' in actual {item_type_name} ({current_context}).")
|
||||
item_match = False
|
||||
continue # Continue to check other fields in the expected_item
|
||||
|
||||
actual_value = actual_item[key]
|
||||
|
||||
if key == "assets": # List of AssetRule dictionaries
|
||||
if not self._compare_list_of_rules(actual_value, expected_value, "AssetRule", current_context, "asset_name"):
|
||||
item_match = False
|
||||
elif key == "files": # List of FileRule dictionaries
|
||||
if not self._compare_list_of_rules(actual_value, expected_value, "FileRule", current_context, "file_path"):
|
||||
item_match = False
|
||||
else: # Regular field comparison
|
||||
if actual_value != expected_value:
|
||||
# Handle None vs "None" string for preset_name specifically if it's a common issue
|
||||
if key == "preset_name" and actual_value is None and expected_value == "None":
|
||||
logger.debug(f"Field '{key}' in {item_type_name} ({current_context}): Actual is None, Expected is string \"None\". Treating as match for now.")
|
||||
elif key == "target_asset_name_override" and actual_value is not None and expected_value is None:
|
||||
# If actual has a value (e.g. parent asset name) and expected is null/None,
|
||||
# this is a mismatch according to strict comparison.
|
||||
# For a more lenient check, this logic could be adjusted here.
|
||||
# Current strict comparison will flag this as error, which is what the logs show.
|
||||
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
|
||||
item_match = False
|
||||
else:
|
||||
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
|
||||
item_match = False
|
||||
|
||||
return item_match
|
||||
|
||||
def _compare_list_of_rules(self, actual_list: List[Dict[str, Any]], expected_list: List[Dict[str, Any]], item_type_name: str, parent_context: str, item_key_field: str) -> bool:
|
||||
"""
|
||||
Compares a list of actual rule items against a list of expected rule items.
|
||||
Items are matched by a key field (e.g., 'asset_name' or 'file_path').
|
||||
Order independent for matching, but logs count mismatches.
|
||||
"""
|
||||
list_match = True # Corrected indentation
|
||||
if not isinstance(actual_list, list) or not isinstance(expected_list, list):
|
||||
logger.error(f"Type mismatch for list of {item_type_name}s in {parent_context}. Expected lists.")
|
||||
return False
|
||||
|
||||
if len(actual_list) != len(expected_list):
|
||||
logger.error(f"Mismatch in number of {item_type_name}s for {parent_context}. Actual: {len(actual_list)}, Expected: {len(expected_list)}.")
|
||||
list_match = False # Count mismatch is an error
|
||||
# If counts differ, we still try to match what we can to provide more detailed feedback,
|
||||
# but the overall list_match will remain False.
|
||||
|
||||
actual_items_map = {item.get(item_key_field): item for item in actual_list if item.get(item_key_field) is not None}
|
||||
|
||||
# Keep track of expected items that found a match to identify missing ones more easily
|
||||
matched_expected_keys = set()
|
||||
|
||||
for expected_item in expected_list:
|
||||
expected_key_value = expected_item.get(item_key_field)
|
||||
if expected_key_value is None:
|
||||
logger.error(f"Expected {item_type_name} in {parent_context} is missing key field '{item_key_field}'. Cannot compare this item: {expected_item}")
|
||||
list_match = False # This specific expected item cannot be processed
|
||||
continue
|
||||
|
||||
actual_item = actual_items_map.get(expected_key_value)
|
||||
if actual_item:
|
||||
matched_expected_keys.add(expected_key_value)
|
||||
if not self._compare_rule_item(actual_item, expected_item, item_type_name, parent_context):
|
||||
list_match = False # Individual item comparison failed
|
||||
else:
|
||||
logger.error(f"Expected {item_type_name} with {item_key_field} '{expected_key_value}' not found in actual items for {parent_context}.")
|
||||
list_match = False
|
||||
|
||||
# Identify actual items that were not matched by any expected item
|
||||
# This is useful if len(actual_list) >= len(expected_list) but some actual items are "extra"
|
||||
for actual_key_value, actual_item_data in actual_items_map.items():
|
||||
if actual_key_value not in matched_expected_keys:
|
||||
logger.debug(f"Extra actual {item_type_name} with {item_key_field} '{actual_key_value}' found in {parent_context} (not in expected list or already matched).")
|
||||
if len(actual_list) != len(expected_list): # If counts already flagged a mismatch, this is just detail
|
||||
pass
|
||||
else: # Counts matched, but content didn't align perfectly by key
|
||||
list_match = False
|
||||
|
||||
|
||||
return list_match # Corrected indentation
|
||||
|
||||
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool: # Corrected structure: moved out
|
||||
item_match = False
|
||||
|
||||
return item_match
|
||||
|
||||
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Compares the actual rule data (converted from live SourceRule objects)
|
||||
with the expected rule data (loaded from JSON).
|
||||
"""
|
||||
logger.debug("Comparing actual rules with expected rules...")
|
||||
|
||||
actual_source_rules = actual_rules_data.get("source_rules", []) if actual_rules_data else []
|
||||
expected_source_rules = expected_rules_data.get("source_rules", []) if expected_rules_data else []
|
||||
|
||||
if not isinstance(actual_source_rules, list):
|
||||
logger.error(f"Actual 'source_rules' is not a list. Found type: {type(actual_source_rules)}. Comparison aborted.")
|
||||
return False # Cannot compare if actual data is malformed
|
||||
if not isinstance(expected_source_rules, list):
|
||||
logger.error(f"Expected 'source_rules' is not a list. Found type: {type(expected_source_rules)}. Test configuration error. Comparison aborted.")
|
||||
return False # Test setup error
|
||||
|
||||
if not expected_source_rules and not actual_source_rules:
|
||||
logger.debug("Both expected and actual source rules lists are empty. Considered a match.")
|
||||
return True
|
||||
|
||||
if len(actual_source_rules) != len(expected_source_rules):
|
||||
logger.error(f"Mismatch in the number of source rules. Actual: {len(actual_source_rules)}, Expected: {len(expected_source_rules)}.")
|
||||
# Optionally, log more details about which list is longer/shorter or identifiers if available
|
||||
return False
|
||||
|
||||
overall_match_status = True
|
||||
for i in range(len(expected_source_rules)):
|
||||
actual_sr = actual_source_rules[i]
|
||||
expected_sr = expected_source_rules[i]
|
||||
|
||||
# For context, use input_path or an index
|
||||
source_rule_context = expected_sr.get('input_path', f"SourceRule_index_{i}")
|
||||
|
||||
if not self._compare_rule_item(actual_sr, expected_sr, "SourceRule", parent_context=source_rule_context):
|
||||
overall_match_status = False
|
||||
# Continue checking other source rules to log all discrepancies
|
||||
|
||||
if overall_match_status:
|
||||
logger.debug("All rules match the expected criteria.") # Covered by "Rule comparison successful" summary
|
||||
else:
|
||||
logger.warning("One or more rules did not match the expected criteria. See logs above for details.")
|
||||
|
||||
return overall_match_status
|
||||
|
||||
def _process_and_display_logs(self, logs_text: str) -> None: # logs_text is no longer the primary source for search
|
||||
"""
|
||||
Processes and displays logs, potentially filtering them if --search is used.
|
||||
Also checks for tracebacks.
|
||||
Sources logs from the in-memory handler for search and detailed analysis.
|
||||
"""
|
||||
logger.debug("--- Log Analysis ---")
|
||||
global autotest_memory_handler # Access the global handler
|
||||
log_records = []
|
||||
if autotest_memory_handler and autotest_memory_handler.buffer:
|
||||
log_records = autotest_memory_handler.buffer
|
||||
|
||||
formatted_log_lines = []
|
||||
# Define a consistent formatter, similar to what might be expected or useful for search
|
||||
record_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
|
||||
# Default asctime format includes milliseconds.
|
||||
|
||||
|
||||
for record in log_records:
|
||||
formatted_log_lines.append(record_formatter.format(record))
|
||||
|
||||
lines_for_search_and_traceback = formatted_log_lines
|
||||
|
||||
if not lines_for_search_and_traceback:
|
||||
logger.warning("No log records found in memory handler. No analysis to perform.")
|
||||
# Still check the console logs_text for tracebacks if it exists, as a fallback
|
||||
# or if some critical errors didn't make it to the memory handler (unlikely with DEBUG level)
|
||||
if logs_text:
|
||||
logger.debug("Checking provided logs_text (from console) for tracebacks as a fallback.")
|
||||
console_lines = logs_text.splitlines()
|
||||
traceback_found_console = False
|
||||
for i, line in enumerate(console_lines):
|
||||
if line.strip().startswith("Traceback (most recent call last):"):
|
||||
logger.error(f"!!! TRACEBACK DETECTED in console logs_text around line {i+1} !!!")
|
||||
traceback_found_console = True
|
||||
if traceback_found_console:
|
||||
logger.warning("A traceback was found in the console logs_text.")
|
||||
else:
|
||||
logger.info("No tracebacks found in the console logs_text either.")
|
||||
logger.info("--- End Log Analysis ---")
|
||||
return
|
||||
|
||||
traceback_found = False
|
||||
|
||||
if self.cli_args.search:
|
||||
logger.info(f"Searching {len(lines_for_search_and_traceback)} in-memory log lines for term '{self.cli_args.search}' with {self.cli_args.additional_lines} context lines.")
|
||||
matched_line_indices = [i for i, line in enumerate(lines_for_search_and_traceback) if self.cli_args.search in line]
|
||||
|
||||
if not matched_line_indices:
|
||||
logger.info(f"Search term '{self.cli_args.search}' not found in in-memory logs.")
|
||||
else:
|
||||
logger.info(f"Found {len(matched_line_indices)} match(es) for '{self.cli_args.search}' in in-memory logs:")
|
||||
collected_lines_to_print = set()
|
||||
for match_idx in matched_line_indices:
|
||||
start_idx = max(0, match_idx - self.cli_args.additional_lines)
|
||||
end_idx = min(len(lines_for_search_and_traceback), match_idx + self.cli_args.additional_lines + 1)
|
||||
for i in range(start_idx, end_idx):
|
||||
# Use i directly as index for lines_for_search_and_traceback, line number is for display
|
||||
collected_lines_to_print.add(f"L{i+1:05d}: {lines_for_search_and_traceback[i]}")
|
||||
|
||||
print("--- Filtered Log Output (from Memory Handler) ---")
|
||||
for line_to_print in sorted(list(collected_lines_to_print)):
|
||||
print(line_to_print)
|
||||
print("--- End Filtered Log Output ---")
|
||||
# Removed: else block that showed last N lines by default (as per original instruction for this section)
|
||||
|
||||
# Traceback Check (on lines_for_search_and_traceback)
|
||||
for i, line in enumerate(lines_for_search_and_traceback):
|
||||
if line.strip().startswith("Traceback (most recent call last):") or "Traceback (most recent call last):" in line : # More robust check
|
||||
logger.error(f"!!! TRACEBACK DETECTED in in-memory logs around line index {i} !!!")
|
||||
logger.error(f"Line content: {line}")
|
||||
traceback_found = True
|
||||
|
||||
if traceback_found:
|
||||
logger.warning("A traceback was found in the in-memory logs. This usually indicates a significant issue.")
|
||||
else:
|
||||
logger.info("No tracebacks found in the in-memory logs.") # This refers to the comprehensive memory logs
|
||||
|
||||
logger.info("--- End Log Analysis ---")
|
||||
|
||||
def cleanup_and_exit(self, success: bool = True) -> None:
|
||||
"""Cleans up and exits the application."""
|
||||
global autotest_memory_handler
|
||||
if autotest_memory_handler:
|
||||
logger.debug("Clearing memory log handler buffer and removing handler.")
|
||||
autotest_memory_handler.buffer = [] # Clear buffer
|
||||
logging.getLogger().removeHandler(autotest_memory_handler) # Remove handler
|
||||
autotest_memory_handler.close() # MemoryHandler close is a no-op but good practice
|
||||
autotest_memory_handler = None
|
||||
|
||||
logger.info(f"Test {'succeeded' if success else 'failed'}. Cleaning up and exiting...") # KEEP INFO - Passes filter
|
||||
q_app = QCoreApplication.instance()
|
||||
if q_app:
|
||||
q_app.quit()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
# --- Main Execution ---
|
||||
def main():
|
||||
"""Main function to run the autotest script."""
|
||||
cli_args = parse_arguments()
|
||||
# Logger is configured above, this will now use the new filtered setup
|
||||
logger.info(f"Parsed CLI arguments: {cli_args}") # KEEP INFO - Passes filter
|
||||
|
||||
# Clean and ensure output directory exists
|
||||
output_dir_path = Path(cli_args.outputdir)
|
||||
logger.debug(f"Preparing output directory: {output_dir_path}")
|
||||
try:
|
||||
if output_dir_path.exists():
|
||||
logger.debug(f"Output directory {output_dir_path} exists. Cleaning its contents...")
|
||||
for item in output_dir_path.iterdir():
|
||||
if item.is_dir():
|
||||
shutil.rmtree(item)
|
||||
logger.debug(f"Removed directory: {item}")
|
||||
else:
|
||||
item.unlink()
|
||||
logger.debug(f"Removed file: {item}")
|
||||
logger.debug(f"Contents of {output_dir_path} cleaned.")
|
||||
else:
|
||||
logger.debug(f"Output directory {output_dir_path} does not exist. Creating it.")
|
||||
|
||||
output_dir_path.mkdir(parents=True, exist_ok=True) # Ensure it exists after cleaning/if it didn't exist
|
||||
logger.debug(f"Output directory {output_dir_path} is ready.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Could not prepare output directory {output_dir_path}: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize QApplication
|
||||
# Use QCoreApplication if no GUI elements are directly interacted with by the test logic itself,
|
||||
# but QApplication is needed if MainWindow or its widgets are constructed and used.
|
||||
# Since MainWindow is instantiated by App, QApplication is appropriate.
|
||||
q_app = QApplication.instance()
|
||||
if not q_app:
|
||||
q_app = QApplication(sys.argv)
|
||||
if not q_app: # Still no app
|
||||
logger.error("Failed to initialize QApplication.")
|
||||
sys.exit(1)
|
||||
|
||||
logger.debug("Initializing main.App()...")
|
||||
try:
|
||||
# Instantiate main.App() - this should create MainWindow but not show it by default
|
||||
# if App is designed to not show GUI unless app.main_window.show() is called.
|
||||
app_instance = App()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize main.App: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
if not app_instance.main_window:
|
||||
logger.error("main.App initialized, but main_window is None. Cannot proceed with test.")
|
||||
sys.exit(1)
|
||||
|
||||
logger.debug("Initializing AutoTester...")
|
||||
try:
|
||||
tester = AutoTester(app_instance, cli_args)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize AutoTester: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Use QTimer.singleShot to start the test after the Qt event loop has started.
|
||||
# This ensures that the Qt environment is fully set up.
|
||||
logger.debug("Scheduling test run...")
|
||||
QTimer.singleShot(0, tester.run_test)
|
||||
|
||||
logger.debug("Starting Qt application event loop...")
|
||||
exit_code = q_app.exec()
|
||||
logger.debug(f"Qt application event loop finished with exit code: {exit_code}")
|
||||
sys.exit(exit_code)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
13
main.py
13
main.py
@ -401,11 +401,13 @@ class App(QObject):
|
||||
|
||||
# --- Get paths needed for ProcessingTask ---
|
||||
try:
|
||||
# Access output path via MainPanelWidget
|
||||
output_base_path_str = self.main_window.main_panel_widget.output_path_edit.text().strip()
|
||||
# Get output_dir from processing_settings passed from autotest.py
|
||||
output_base_path_str = processing_settings.get("output_dir")
|
||||
log.info(f"APP_DEBUG: Received output_dir in processing_settings: {output_base_path_str}")
|
||||
|
||||
if not output_base_path_str:
|
||||
log.error("Cannot queue tasks: Output directory path is empty in the GUI.")
|
||||
self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000)
|
||||
log.error("Cannot queue tasks: Output directory path is empty in processing_settings.")
|
||||
# self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000) # GUI specific
|
||||
return
|
||||
output_base_path = Path(output_base_path_str)
|
||||
# Basic validation - check if it's likely a valid path structure (doesn't guarantee existence/writability here)
|
||||
@ -477,8 +479,9 @@ class App(QObject):
|
||||
engine=task_engine,
|
||||
rule=rule,
|
||||
workspace_path=workspace_path,
|
||||
output_base_path=output_base_path
|
||||
output_base_path=output_base_path # This is Path(output_base_path_str)
|
||||
)
|
||||
log.info(f"APP_DEBUG: Passing to ProcessingTask: output_base_path = {output_base_path}")
|
||||
task.signals.finished.connect(self._on_task_finished)
|
||||
log.debug(f"DEBUG: Calling thread_pool.start() for task {i+1}")
|
||||
self.thread_pool.start(task)
|
||||
|
||||
@ -85,6 +85,7 @@ class MetadataInitializationStage(ProcessingStage):
|
||||
merged_maps_details.
|
||||
"""
|
||||
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
|
||||
logger.debug(f"METADATA_INIT_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Added
|
||||
"""
|
||||
Executes the metadata initialization logic.
|
||||
|
||||
@ -170,4 +171,5 @@ class MetadataInitializationStage(ProcessingStage):
|
||||
# Example of how you might log the full metadata for debugging:
|
||||
# logger.debug(f"Initialized metadata: {context.asset_metadata}")
|
||||
|
||||
logger.debug(f"METADATA_INIT_DEBUG: Exit - context.output_base_path = {context.output_base_path}") # Added
|
||||
return context
|
||||
@ -17,8 +17,16 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
"""
|
||||
|
||||
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
|
||||
log.info("OUTPUT_ORG: Stage execution started for asset '%s'", context.asset_rule.asset_name)
|
||||
log.info(f"OUTPUT_ORG: context.processed_maps_details at start: {context.processed_maps_details}")
|
||||
asset_name_for_log_early = context.asset_rule.asset_name if hasattr(context, 'asset_rule') and context.asset_rule else "Unknown Asset (early)"
|
||||
log.info(f"OUTPUT_ORG_DEBUG: Stage execution started for asset '{asset_name_for_log_early}'.")
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Modified
|
||||
log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj.output_directory_base (raw from config) = {getattr(context.config_obj, 'output_directory_base', 'N/A')}")
|
||||
# resolved_base = "N/A"
|
||||
# if hasattr(context.config_obj, '_settings') and context.config_obj._settings.get('OUTPUT_BASE_DIR'):
|
||||
# base_dir_from_settings = context.config_obj._settings.get('OUTPUT_BASE_DIR')
|
||||
# Path resolution logic might be complex
|
||||
# log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj._settings.OUTPUT_BASE_DIR (resolved guess) = {resolved_base}")
|
||||
log.info(f"OUTPUT_ORG_DEBUG: context.processed_maps_details at start: {context.processed_maps_details}")
|
||||
"""
|
||||
Copies temporary processed and merged files to their final output locations
|
||||
based on path patterns and updates AssetProcessingContext.
|
||||
@ -103,7 +111,9 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
pattern_string=output_dir_pattern,
|
||||
token_data=token_data_variant_cleaned
|
||||
)
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Using context.output_base_path = {context.output_base_path} for final_variant_path construction.") # Added
|
||||
final_variant_path = Path(context.output_base_path) / Path(relative_dir_path_str_variant) / Path(output_filename_variant)
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Constructed final_variant_path = {final_variant_path}") # Added
|
||||
final_variant_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if final_variant_path.exists() and not overwrite_existing:
|
||||
@ -169,7 +179,9 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
pattern_string=output_dir_pattern,
|
||||
token_data=token_data_cleaned
|
||||
)
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Using context.output_base_path = {context.output_base_path} for final_path construction.") # Added
|
||||
final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename)
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Constructed final_path = {final_path}") # Added
|
||||
final_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if final_path.exists() and not overwrite_existing:
|
||||
@ -245,10 +257,12 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
token_data=base_token_data_cleaned
|
||||
)
|
||||
# Destination: <output_base_path>/<asset_base_output_dir_str>/<extra_subdir_name>/<original_filename>
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Using context.output_base_path = {context.output_base_path} for final_dest_path construction.") # Added
|
||||
final_dest_path = (Path(context.output_base_path) /
|
||||
Path(asset_base_output_dir_str) /
|
||||
Path(extra_subdir_name) /
|
||||
source_file_path.name) # Use original filename
|
||||
logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Constructed final_dest_path = {final_dest_path}") # Added
|
||||
|
||||
final_dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user